Skip to main content

mz_deploy/lsp/
server.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! LSP backend and `LanguageServer` trait implementation.
11//!
12//! [`Backend`] holds per-session state: open documents, compiled project
13//! metadata (for go-to-definition, hover, completion, and code lens),
14//! and workspace configuration.
15//!
16//! ## State Management
17//!
18//! - **`documents`** — Open document contents, updated on every `didOpen` /
19//!   `didChange`.
20//! - **`project_cache`** — Compiled project metadata. Opened lazily on the
21//!   first successful build; the same handle is reused across rebuilds.
22//! - **`parse_diagnostics`** — Per-keystroke parse-level diagnostics, keyed by
23//!   URI. Updated on every `didOpen` / `didChange`.
24//! - **`project_diagnostics`** — Project-level (validation + typecheck)
25//!   diagnostics, keyed by URI. Updated on every `rebuild_project`.
26//! - **`root`** — The workspace root directory.
27//! - **`settings`** / **`variables`** — Project and profile configuration,
28//!   reloaded at startup and on every save.
29//!
30//! ## Diagnostic Publishing
31//!
32//! LSP `publishDiagnostics` is full-replacement per URI, so the two sources
33//! must be merged before publishing or one will overwrite the other. Both
34//! diagnostic flows route through [`Backend::publish_merged`], which reads
35//! both maps and emits the union.
36//!
37//! `rebuild_project()` runs validation and (on a successful build)
38//! typechecking inline. Both are merged into the new project-diagnostic map;
39//! every URI that was previously tracked or is newly tracked is republished
40//! through [`Backend::publish_merged`] so stale project diagnostics clear
41//! while parse diagnostics for open documents survive.
42
43use crate::config::{ProjectSettings, read_mzprofile};
44use crate::lsp::{
45    code_action, code_lens, completion, diagnostics, document_symbol, goto_definition, hover,
46    references, semantic_tokens, workspace_symbol,
47};
48use crate::project;
49use crate::project::compiler::cache::ProjectCache;
50use crate::project::compiler::typecheck::TypeCheckError;
51use crate::project::error::{ProjectError, ValidationErrors};
52use crate::project::ir::graph;
53use crate::types;
54use ropey::Rope;
55use std::collections::{BTreeMap, BTreeSet};
56use std::ops::Deref;
57use std::path::{Path, PathBuf};
58use std::sync::Arc;
59use std::sync::atomic::{AtomicU64, Ordering};
60use std::time::Duration;
61use tokio::sync::{Mutex, RwLock};
62use tower_lsp::jsonrpc::Result;
63use tower_lsp::lsp_types::*;
64use tower_lsp::{Client, LanguageServer};
65
66/// How long to wait after the last keystroke before kicking off an idle
67/// rebuild. Long enough that rapid typing doesn't run repeated rebuilds,
68/// short enough that the user sees diagnostics promptly after pausing.
69const IDLE_REBUILD_DEBOUNCE: Duration = Duration::from_millis(100);
70
71/// Resolve the active profile for LSP operations.
72///
73/// The language server has no CLI flags; it reads the project root's
74/// `.mzprofile`. Returns `None` if no profile is set — variable resolution,
75/// suffix lookup, and cluster normalization all degrade gracefully (no
76/// variables, no suffix), and SQL with `:variables` will surface
77/// undefined-variable diagnostics as usual.
78fn resolve_lsp_profile_name(project_root: &Path) -> Option<String> {
79    read_mzprofile(project_root).ok().flatten()
80}
81
82/// Try to open a long-lived [`ProjectCache`] (read-only SQLite connection).
83///
84/// Returns `None` if the database file doesn't exist yet or can't be opened.
85fn try_open_project_cache(
86    root: &Path,
87    profile: &str,
88    profile_suffix: Option<&str>,
89    variables: &BTreeMap<String, String>,
90) -> Option<ProjectCache> {
91    ProjectCache::open(root, profile, profile_suffix, variables)
92        .ok()
93        .flatten()
94}
95
96/// LSP backend holding session state.
97///
98/// `Backend` is a cheap-to-clone handle around an [`Arc<BackendInner>`].
99/// Cloning is the standard way to capture state in spawned tokio tasks (e.g.
100/// the debounced rebuild scheduler). Field access goes through `Deref` so
101/// existing `self.documents.lock()` patterns work unchanged.
102#[derive(Clone)]
103pub(super) struct Backend {
104    inner: Arc<BackendInner>,
105}
106
107pub(super) struct BackendInner {
108    /// Client handle for sending notifications (e.g., diagnostics).
109    client: Client,
110    /// Per-file text ropes, keyed by document URI.
111    documents: Mutex<BTreeMap<Url, Rope>>,
112    /// Compiled project metadata for go-to-definition, hover, completion, and code lens.
113    project_cache: Mutex<Option<ProjectCache>>,
114    /// Latest per-keystroke parse-level diagnostics, keyed by URI.
115    parse_diagnostics: Mutex<BTreeMap<Url, Vec<Diagnostic>>>,
116    /// Latest project-level (validation + typecheck) diagnostics, keyed by URI.
117    project_diagnostics: Mutex<BTreeMap<Url, Vec<Diagnostic>>>,
118    /// Project root directory.
119    root: RwLock<PathBuf>,
120    /// Cached project settings loaded from `project.toml`.
121    settings: RwLock<Option<ProjectSettings>>,
122    /// Cached variables from the active profile config.
123    variables: RwLock<BTreeMap<String, String>>,
124    /// Name of the active profile (for hover display). `None` when no profile
125    /// is set in the project — variable references will surface as undefined.
126    profile_name: RwLock<Option<String>>,
127    /// Monotonic edit version. Bumps on every signal that should cause a
128    /// rebuild (didChange, didSave, didChangeWatchedFiles, initialized).
129    edit_version: AtomicU64,
130    /// Highest edit version a published rebuild was based on. A rebuild only
131    /// runs if `edit_version > rebuilt_through`.
132    rebuilt_through: AtomicU64,
133    /// Serializes rebuild execution: at most one rebuild runs at a time.
134    rebuild_lock: Mutex<()>,
135}
136
137impl Deref for Backend {
138    type Target = BackendInner;
139    fn deref(&self) -> &BackendInner {
140        &self.inner
141    }
142}
143
144impl Backend {
145    /// Create a new backend with the given LSP client handle and project root.
146    pub(super) fn new_with_root(client: Client, root: PathBuf) -> Self {
147        Self {
148            inner: Arc::new(BackendInner {
149                client,
150                documents: Mutex::new(BTreeMap::new()),
151                project_cache: Mutex::new(None),
152                parse_diagnostics: Mutex::new(BTreeMap::new()),
153                project_diagnostics: Mutex::new(BTreeMap::new()),
154                root: RwLock::new(root),
155                settings: RwLock::new(None),
156                variables: RwLock::new(BTreeMap::new()),
157                profile_name: RwLock::new(None),
158                edit_version: AtomicU64::new(0),
159                rebuilt_through: AtomicU64::new(0),
160                rebuild_lock: Mutex::new(()),
161            }),
162        }
163    }
164
165    /// Load project settings and variables from `project.toml`.
166    ///
167    /// Silently defaults when `project.toml` is missing (no config is valid).
168    /// Called during `initialized` and at the start of each `rebuild_project`.
169    async fn load_settings(&self) {
170        let root = self.root.read().await.clone();
171        match ProjectSettings::load(&root) {
172            Ok(ps) => {
173                let name = resolve_lsp_profile_name(&root);
174                let config = match &name {
175                    Some(n) => ps.config_for_profile(n),
176                    None => Default::default(),
177                };
178                *self.variables.write().await = config.variables.clone();
179                *self.profile_name.write().await = name;
180                *self.settings.write().await = Some(ps);
181            }
182            Err(_) => {
183                // No project.toml or parse error — use defaults.
184                *self.settings.write().await = None;
185                *self.variables.write().await = BTreeMap::new();
186                *self.profile_name.write().await = None;
187            }
188        }
189    }
190
191    /// Publish parse diagnostics for a single document.
192    ///
193    /// Updates the parse-diagnostic cache for `uri` and republishes the
194    /// merged set (parse + project) so prior project-level diagnostics for
195    /// this URI survive the per-keystroke refresh.
196    async fn publish_diagnostics(&self, uri: Url, text: &str) {
197        let rope = Rope::from_str(text);
198        let path = uri.to_file_path().ok();
199        let diags = match path.as_deref() {
200            Some(p) if p.extension().and_then(|e| e.to_str()) == Some("toml") => Vec::new(),
201            _ => {
202                let variables = self.variables.read().await.clone();
203                let profile = self.profile_name.read().await.clone();
204                diagnostics::diagnose(text, &rope, &variables, profile.as_deref())
205            }
206        };
207
208        // Store the rope for later offset conversions (go-to-definition).
209        let mut docs = self.documents.lock().await;
210        docs.insert(uri.clone(), rope);
211        drop(docs); // release before .await on client
212
213        self.parse_diagnostics
214            .lock()
215            .await
216            .insert(uri.clone(), diags);
217        self.publish_merged(uri).await;
218    }
219
220    /// Publish the union of parse and project diagnostics for `uri`.
221    ///
222    /// LSP `publishDiagnostics` is a full-replacement per URI, so we have to
223    /// resend both streams together every time either changes. Empty union →
224    /// publish an empty list (which clears any stale diagnostics on the
225    /// client).
226    async fn publish_merged(&self, uri: Url) {
227        let parse = self
228            .parse_diagnostics
229            .lock()
230            .await
231            .get(&uri)
232            .cloned()
233            .unwrap_or_default();
234        let project = self
235            .project_diagnostics
236            .lock()
237            .await
238            .get(&uri)
239            .cloned()
240            .unwrap_or_default();
241        let mut merged = parse;
242        merged.extend(project);
243        self.client.publish_diagnostics(uri, merged, None).await;
244    }
245
246    /// Snapshot the open-document map into a `FileSystem` overlay.
247    ///
248    /// Each open document's URI is converted to an absolute filesystem path
249    /// and its rope is stringified into the overlay. URIs that don't resolve
250    /// to a `file://` path (e.g. `untitled:`) are skipped.
251    ///
252    /// At save time the overlay matches disk byte-for-byte (the save just
253    /// flushed). With idle rebuilds (Phase 3), the overlay carries unsaved
254    /// edits forward into the compiler.
255    async fn build_overlay(&self) -> crate::fs::FileSystem {
256        let docs = self.documents.lock().await;
257        let mut overlay = BTreeMap::new();
258        for (uri, rope) in docs.iter() {
259            if let Ok(path) = uri.to_file_path() {
260                overlay.insert(path, rope.to_string());
261            }
262        }
263        crate::fs::FileSystem::with_overlay(overlay)
264    }
265
266    /// Snapshot document text and cursor context for a given position.
267    ///
268    /// Acquires the documents lock once and returns the full document text,
269    /// char offset, and optional dot-qualified identifier parts at the cursor.
270    /// Returns `None` if the document is not open.
271    async fn snapshot_at_position(
272        &self,
273        uri: &Url,
274        position: Position,
275    ) -> Option<(String, usize, Option<Vec<String>>)> {
276        let (byte_offset, text) = {
277            let docs = self.documents.lock().await;
278            let rope = docs.get(uri)?;
279            let offset = diagnostics::position_to_offset(position, rope)?;
280            (offset, rope.to_string())
281        };
282
283        let parts = goto_definition::find_reference_at_position(&text, byte_offset);
284        Some((text, byte_offset, parts))
285    }
286
287    /// Schedule a debounced rebuild after the next idle window.
288    ///
289    /// Each call bumps `edit_version` and spawns a task that, after
290    /// [`IDLE_REBUILD_DEBOUNCE`], calls [`maybe_rebuild`](Self::maybe_rebuild)
291    /// only if its captured version is still the latest — i.e. the user has
292    /// stopped typing. Sibling tasks from earlier keystrokes self-bail.
293    fn schedule_rebuild_after_idle(&self) {
294        let target = self.edit_version.fetch_add(1, Ordering::SeqCst) + 1;
295        let backend = self.clone();
296        mz_ore::task::spawn(|| "debounce", async move {
297            tokio::time::sleep(IDLE_REBUILD_DEBOUNCE).await;
298            if backend.edit_version.load(Ordering::SeqCst) != target {
299                // A newer keystroke has scheduled (or will schedule) its own task.
300                return;
301            }
302            backend.maybe_rebuild().await;
303        });
304    }
305
306    /// Run a rebuild if the buffer has moved past the last published version,
307    /// serializing concurrent triggers through `rebuild_lock`.
308    ///
309    /// Concurrency invariants:
310    /// - At most one rebuild executes at a time (mutex).
311    /// - Each rebuild captures `edit_version` under the lock, so its overlay
312    ///   snapshot is consistent with `started_against`.
313    /// - If the buffer changes during the rebuild, the publish step is
314    ///   skipped and `rebuilt_through` is *not* advanced — the next trigger
315    ///   will rebuild against the newer version.
316    /// - Redundant triggers (no edits since the last published rebuild)
317    ///   skip the pipeline via the `started_against <= rebuilt_through`
318    ///   check.
319    ///
320    /// Runs validation and (on success) typechecking, merges their diagnostics,
321    /// and applies the resulting publish/clear actions via the LSP client.
322    /// Opens the [`ProjectCache`] lazily on the first rebuild where the
323    /// SQLite DB file exists, regardless of whether the build itself
324    /// succeeded — this keeps hover/goto/find-references usable against the
325    /// last-known-good state even while the user has a temporary error in
326    /// their working tree.
327    async fn maybe_rebuild(&self) {
328        if self.edit_version.load(Ordering::SeqCst) <= self.rebuilt_through.load(Ordering::SeqCst) {
329            return;
330        }
331        let _guard = self.rebuild_lock.lock().await;
332        let started_against = self.edit_version.load(Ordering::SeqCst);
333        if started_against <= self.rebuilt_through.load(Ordering::SeqCst) {
334            // Another rebuild covered this version while we waited for the lock.
335            return;
336        }
337
338        self.load_settings().await;
339        let root = self.root.read().await.clone();
340        let (profile, profile_suffix, variables) = {
341            let settings_guard = self.settings.read().await;
342            match settings_guard.as_ref() {
343                Some(ps) => {
344                    let profile = resolve_lsp_profile_name(&root);
345                    let config = match &profile {
346                        Some(name) => ps.config_for_profile(name),
347                        None => Default::default(),
348                    };
349                    (
350                        profile,
351                        config.profile_suffix.clone(),
352                        config.variables.clone(),
353                    )
354                }
355                None => (None, None, BTreeMap::new()),
356            }
357        };
358
359        let fs = self.build_overlay().await;
360        let build_result = project::plan_sync(
361            &fs,
362            &root,
363            profile.as_deref(),
364            profile_suffix.as_deref(),
365            &variables,
366        );
367
368        // Extract validation diagnostics from the build result (pure).
369        let mut new_diagnostics = match &build_result {
370            Err(ProjectError::Validation(ValidationErrors { errors })) => {
371                diagnostics::validation_diagnostics(&fs, errors)
372            }
373            _ => BTreeMap::new(),
374        };
375
376        let project = match build_result {
377            Ok(p) => Some(Arc::new(p)),
378            Err(ref e) => {
379                // Validation errors already flow through `new_diagnostics`;
380                // surface non-validation build failures as a client log.
381                if !matches!(e, ProjectError::Validation(_)) {
382                    self.client
383                        .log_message(MessageType::ERROR, format!("Project build failed: {e}"))
384                        .await;
385                }
386                None
387            }
388        };
389
390        // Run typecheck only when the project compiled. Merge typecheck errors
391        // into the diagnostic map so they flow through the same publish/clear
392        // pipeline as validation errors.
393        if let Some(ref project) = project {
394            if let Some(tc_err) = self
395                .run_typecheck(
396                    Arc::clone(project),
397                    profile.as_deref().unwrap_or(""),
398                    profile_suffix.as_deref(),
399                    &variables,
400                )
401                .await
402            {
403                let candidates = {
404                    let guard = self.project_cache.lock().await;
405                    code_action::harvest_candidates(guard.as_ref())
406                };
407                let tc_diags = diagnostics::typecheck_diagnostics(&fs, &tc_err, &candidates);
408                if tc_diags.is_empty() {
409                    self.client
410                        .log_message(MessageType::ERROR, format!("Typecheck failed: {tc_err}"))
411                        .await;
412                } else {
413                    for (path, diags) in tc_diags {
414                        new_diagnostics.entry(path).or_default().extend(diags);
415                    }
416                }
417            }
418        }
419
420        // Open the long-lived ProjectCache SQLite connection the first time
421        // the DB file is present. `try_open_project_cache` returns `None` when
422        // the file doesn't exist, so it's safe to attempt this even when the
423        // build failed — we still want hover/goto to work against any
424        // previously-written rows. Done unconditionally because it's
425        // idempotent and useful even for stale rebuilds.
426        {
427            let mut guard = self.project_cache.lock().await;
428            if guard.is_none() {
429                *guard = try_open_project_cache(
430                    &root,
431                    profile.as_deref().unwrap_or(""),
432                    profile_suffix.as_deref(),
433                    &variables,
434                );
435            }
436        }
437
438        // Generation guard: drop stale results if the buffer changed during
439        // the rebuild. The next trigger will rebuild against the newer
440        // version; do not advance `rebuilt_through`.
441        if self.edit_version.load(Ordering::SeqCst) != started_against {
442            return;
443        }
444
445        // Convert path-keyed map to URI-keyed map; drop entries whose paths
446        // can't be expressed as `file://` URIs (relative paths, etc.).
447        let new_project_diags: BTreeMap<Url, Vec<Diagnostic>> = new_diagnostics
448            .into_iter()
449            .filter_map(|(path, diags)| Url::from_file_path(path).ok().map(|uri| (uri, diags)))
450            .collect();
451
452        // Swap in the new map and compute the union of old ∪ new URIs to
453        // republish: both old-only (so stale project diagnostics clear) and
454        // new (so fresh project diagnostics appear).
455        let to_republish: BTreeSet<Url> = {
456            let mut guard = self.project_diagnostics.lock().await;
457            let union: BTreeSet<Url> = guard
458                .keys()
459                .chain(new_project_diags.keys())
460                .cloned()
461                .collect();
462            *guard = new_project_diags;
463            union
464        };
465
466        for uri in to_republish {
467            self.publish_merged(uri).await;
468        }
469
470        self.rebuilt_through
471            .store(started_against, Ordering::SeqCst);
472    }
473
474    /// Run compiler-owned typechecking.
475    ///
476    /// Returns `Some(err)` on typecheck failure so the caller can convert it
477    /// into LSP diagnostics, `None` on success.
478    async fn run_typecheck(
479        &self,
480        project: Arc<graph::Project>,
481        profile: &str,
482        profile_suffix: Option<&str>,
483        variables: &BTreeMap<String, String>,
484    ) -> Option<TypeCheckError> {
485        let root = self.root.read().await.clone();
486        let types_lock = types::load_types_lock(&root).unwrap_or_default();
487        match project::compiler::typecheck::run(
488            &root,
489            profile,
490            profile_suffix,
491            variables,
492            &project,
493            types_lock,
494        ) {
495            Ok((_, _stats)) => None,
496            Err(e) => Some(e),
497        }
498    }
499}
500
501#[tower_lsp::async_trait]
502impl LanguageServer for Backend {
503    async fn initialize(&self, params: InitializeParams) -> Result<InitializeResult> {
504        if let Some(root_uri) = params.root_uri {
505            if let Ok(path) = root_uri.to_file_path() {
506                let mut root = self.root.write().await;
507                *root = path;
508            }
509        }
510
511        Ok(InitializeResult {
512            capabilities: ServerCapabilities {
513                text_document_sync: Some(TextDocumentSyncCapability::Options(
514                    TextDocumentSyncOptions {
515                        open_close: Some(true),
516                        change: Some(TextDocumentSyncKind::FULL),
517                        save: Some(TextDocumentSyncSaveOptions::SaveOptions(SaveOptions {
518                            include_text: Some(false),
519                        })),
520                        ..Default::default()
521                    },
522                )),
523                completion_provider: Some(CompletionOptions::default()),
524                definition_provider: Some(OneOf::Left(true)),
525                references_provider: Some(OneOf::Left(true)),
526                document_symbol_provider: Some(OneOf::Left(true)),
527                workspace_symbol_provider: Some(OneOf::Left(true)),
528                hover_provider: Some(HoverProviderCapability::Simple(true)),
529                code_lens_provider: Some(CodeLensOptions {
530                    resolve_provider: Some(false),
531                }),
532                code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
533                semantic_tokens_provider: Some(
534                    SemanticTokensServerCapabilities::SemanticTokensOptions(
535                        SemanticTokensOptions {
536                            legend: SemanticTokensLegend {
537                                token_types: semantic_tokens::legend_token_types(),
538                                token_modifiers: vec![],
539                            },
540                            full: Some(SemanticTokensFullOptions::Bool(true)),
541                            ..Default::default()
542                        },
543                    ),
544                ),
545                ..Default::default()
546            },
547            ..Default::default()
548        })
549    }
550
551    async fn initialized(&self, _: InitializedParams) {
552        self.edit_version.fetch_add(1, Ordering::SeqCst);
553        self.maybe_rebuild().await;
554    }
555
556    async fn shutdown(&self) -> Result<()> {
557        Ok(())
558    }
559
560    async fn did_open(&self, params: DidOpenTextDocumentParams) {
561        self.publish_diagnostics(params.text_document.uri, &params.text_document.text)
562            .await;
563    }
564
565    async fn did_change(&self, params: DidChangeTextDocumentParams) {
566        if let Some(change) = params.content_changes.into_iter().last() {
567            self.publish_diagnostics(params.text_document.uri, &change.text)
568                .await;
569            self.schedule_rebuild_after_idle();
570        }
571    }
572
573    async fn did_close(&self, params: DidCloseTextDocumentParams) {
574        let uri = params.text_document.uri;
575        self.documents.lock().await.remove(&uri);
576        self.parse_diagnostics.lock().await.remove(&uri);
577        self.publish_merged(uri).await;
578        self.edit_version.fetch_add(1, Ordering::SeqCst);
579        self.maybe_rebuild().await;
580    }
581
582    async fn did_save(&self, _params: DidSaveTextDocumentParams) {
583        self.edit_version.fetch_add(1, Ordering::SeqCst);
584        self.maybe_rebuild().await;
585    }
586
587    async fn did_change_watched_files(&self, _params: DidChangeWatchedFilesParams) {
588        self.edit_version.fetch_add(1, Ordering::SeqCst);
589        self.maybe_rebuild().await;
590    }
591
592    async fn goto_definition(
593        &self,
594        params: GotoDefinitionParams,
595    ) -> Result<Option<GotoDefinitionResponse>> {
596        let uri = params.text_document_position_params.text_document.uri;
597        let position = params.text_document_position_params.position;
598
599        let (_, _, parts) = match self.snapshot_at_position(&uri, position).await {
600            Some(s) => s,
601            None => return Ok(None),
602        };
603        let parts = match parts {
604            Some(p) => p,
605            None => return Ok(None),
606        };
607
608        let root = self.root.read().await.clone();
609        let cache_guard = self.project_cache.lock().await;
610        let cache = match cache_guard.as_ref() {
611            Some(c) => c,
612            None => return Ok(None),
613        };
614
615        let location = goto_definition::resolve_reference(&parts, &uri, &root, cache);
616        Ok(location.map(GotoDefinitionResponse::Scalar))
617    }
618
619    async fn references(&self, params: ReferenceParams) -> Result<Option<Vec<Location>>> {
620        let uri = params.text_document_position.text_document.uri;
621        let position = params.text_document_position.position;
622
623        let (_, _, parts) = match self.snapshot_at_position(&uri, position).await {
624            Some(s) => s,
625            None => return Ok(None),
626        };
627        let parts = match parts {
628            Some(p) => p,
629            None => return Ok(None),
630        };
631
632        let root = self.root.read().await.clone();
633        let cache_guard = self.project_cache.lock().await;
634        let cache = match cache_guard.as_ref() {
635            Some(c) => c,
636            None => return Ok(None),
637        };
638
639        let locations = references::find_references(
640            &parts,
641            &uri,
642            &root,
643            cache,
644            params.context.include_declaration,
645        );
646        if locations.is_empty() {
647            Ok(None)
648        } else {
649            Ok(Some(locations))
650        }
651    }
652
653    async fn document_symbol(
654        &self,
655        params: DocumentSymbolParams,
656    ) -> Result<Option<DocumentSymbolResponse>> {
657        let file_uri = params.text_document.uri;
658        let root = self.root.read().await.clone();
659
660        let cache_guard = self.project_cache.lock().await;
661        let cache = match cache_guard.as_ref() {
662            Some(c) => c,
663            None => return Ok(None),
664        };
665
666        let symbols = document_symbol::document_symbols(&file_uri, &root, cache);
667        if symbols.is_empty() {
668            Ok(None)
669        } else {
670            Ok(Some(DocumentSymbolResponse::Nested(symbols)))
671        }
672    }
673
674    async fn symbol(
675        &self,
676        params: WorkspaceSymbolParams,
677    ) -> Result<Option<Vec<SymbolInformation>>> {
678        let root = self.root.read().await.clone();
679        let cache_guard = self.project_cache.lock().await;
680        let cache = match cache_guard.as_ref() {
681            Some(c) => c,
682            None => return Ok(None),
683        };
684
685        let symbols = workspace_symbol::workspace_symbols(&params.query, cache, &root);
686        if symbols.is_empty() {
687            Ok(None)
688        } else {
689            Ok(Some(symbols))
690        }
691    }
692
693    async fn hover(&self, params: HoverParams) -> Result<Option<Hover>> {
694        let uri = params.text_document_position_params.text_document.uri;
695        let position = params.text_document_position_params.position;
696
697        let (text, byte_offset, parts) = match self.snapshot_at_position(&uri, position).await {
698            Some(s) => s,
699            None => return Ok(None),
700        };
701
702        // Try variable hover first (pure).
703        let variables = self.variables.read().await;
704        let profile = self.profile_name.read().await;
705        if let Some(h) = hover::resolve_variable_hover(&text, byte_offset, &variables) {
706            return Ok(Some(h));
707        }
708        drop(variables);
709        drop(profile);
710
711        // Then object hover (pure).
712        let parts = match parts {
713            Some(p) => p,
714            None => return Ok(None),
715        };
716
717        let root = self.root.read().await.clone();
718        let cache_guard = self.project_cache.lock().await;
719        let cache = match cache_guard.as_ref() {
720            Some(c) => c,
721            None => return Ok(None),
722        };
723
724        let types_lock = types::load_types_lock(&root).unwrap_or_default();
725
726        Ok(hover::resolve_hover(
727            &parts,
728            &uri,
729            &root,
730            cache,
731            &types_lock,
732        ))
733    }
734
735    async fn completion(&self, params: CompletionParams) -> Result<Option<CompletionResponse>> {
736        let file_uri = params.text_document_position.text_document.uri;
737        let position = params.text_document_position.position;
738        let root = self.root.read().await.clone();
739
740        let doc_text = {
741            let docs = self.documents.lock().await;
742            docs.get(&file_uri).map(|rope| rope.to_string())
743        };
744        let text = doc_text.as_deref().unwrap_or("");
745        let prefix = completion::prefix_context(text, position);
746
747        let cache_guard = self.project_cache.lock().await;
748        let types_lock = types::load_types_lock(&root).unwrap_or_default();
749        let items =
750            completion::complete(cache_guard.as_ref(), &types_lock, &file_uri, &root, &prefix);
751
752        Ok(Some(CompletionResponse::Array(items)))
753    }
754
755    async fn code_lens(&self, params: CodeLensParams) -> Result<Option<Vec<CodeLens>>> {
756        let file_uri = params.text_document.uri;
757        let root = self.root.read().await.clone();
758
759        let doc_text = {
760            let docs = self.documents.lock().await;
761            docs.get(&file_uri).map(|rope| rope.to_string())
762        };
763        let text = match doc_text.as_deref() {
764            Some(t) => t,
765            None => return Ok(None),
766        };
767
768        let cache_guard = self.project_cache.lock().await;
769        let lenses = code_lens::code_lenses(&file_uri, text, &root, cache_guard.as_ref());
770        Ok(Some(lenses))
771    }
772
773    async fn code_action(&self, params: CodeActionParams) -> Result<Option<CodeActionResponse>> {
774        let actions = code_action::build_code_actions(&params);
775        if actions.is_empty() {
776            Ok(None)
777        } else {
778            Ok(Some(actions))
779        }
780    }
781
782    async fn semantic_tokens_full(
783        &self,
784        params: SemanticTokensParams,
785    ) -> Result<Option<SemanticTokensResult>> {
786        let file_uri = params.text_document.uri;
787
788        let doc_text = {
789            let docs = self.documents.lock().await;
790            docs.get(&file_uri).map(|rope| rope.to_string())
791        };
792        let text = match doc_text.as_deref() {
793            Some(t) => t,
794            None => return Ok(None),
795        };
796
797        let data = semantic_tokens::compute_semantic_tokens(text);
798        Ok(Some(SemanticTokensResult::Tokens(SemanticTokens {
799            result_id: None,
800            data,
801        })))
802    }
803}
804
805#[cfg(test)]
806mod tests {
807    use super::*;
808    use std::sync::Mutex as StdMutex;
809
810    fn capture_client_with_root(root: PathBuf) -> (Client, tower_lsp::LspService<Backend>) {
811        let captured_client: Arc<StdMutex<Option<Client>>> = Arc::new(StdMutex::new(None));
812        let captured_client_clone = Arc::clone(&captured_client);
813        let (service, _socket) = tower_lsp::LspService::new(move |client| {
814            *captured_client_clone.lock().unwrap() = Some(client.clone());
815            Backend::new_with_root(client, root.clone())
816        });
817        let client = captured_client.lock().unwrap().take().unwrap();
818        (client, service)
819    }
820
821    fn write_project_toml(root: &Path) {
822        std::fs::write(root.join("project.toml"), "[project]\nname = \"test\"\n").unwrap();
823    }
824
825    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
826    #[mz_ore::test]
827    fn try_open_project_cache_returns_none_for_missing_db() {
828        let result = try_open_project_cache(
829            Path::new("/nonexistent/path"),
830            "default",
831            None,
832            &BTreeMap::new(),
833        );
834        assert!(result.is_none());
835    }
836
837    /// Regression test for the tokio::sync lock conversion.
838    ///
839    /// `publish_diagnostics` acquires `documents.lock()` and then holds the
840    /// lock across an `.await` on `client.publish_diagnostics(...)`. With the
841    /// previous `std::sync::Mutex` a second concurrent call from another task
842    /// would deadlock, because the std guard is not `Send` and blocks the
843    /// worker thread. With `tokio::sync::Mutex` the second task yields
844    /// correctly and both calls complete.
845    ///
846    /// The multi-thread runtime with 2 workers is required so the two spawned
847    /// tasks can actually make progress concurrently.
848    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
849    #[mz_ore::test(tokio::test(flavor = "multi_thread", worker_threads = 2))]
850    async fn concurrent_publish_diagnostics_do_not_deadlock() {
851        let (client, _service) = capture_client_with_root(std::env::temp_dir());
852
853        let backend = Arc::new(Backend::new_with_root(client, std::env::temp_dir()));
854
855        let b1 = Arc::clone(&backend);
856        let b2 = Arc::clone(&backend);
857        let t1 = mz_ore::task::spawn(|| "lsp-test-publish-a", async move {
858            b1.publish_diagnostics(
859                Url::from_file_path(std::env::temp_dir().join("a.sql")).unwrap(),
860                "SELECT 1;",
861            )
862            .await;
863        });
864        let t2 = mz_ore::task::spawn(|| "lsp-test-publish-b", async move {
865            b2.publish_diagnostics(
866                Url::from_file_path(std::env::temp_dir().join("b.sql")).unwrap(),
867                "SELECT 2;",
868            )
869            .await;
870        });
871        let result = tokio::time::timeout(Duration::from_secs(2), async {
872            let _ = tokio::join!(t1, t2);
873        })
874        .await;
875        assert!(result.is_ok(), "concurrent publish_diagnostics timed out");
876    }
877
878    #[mz_ore::test(tokio::test(flavor = "multi_thread", worker_threads = 2))]
879    async fn code_action_returns_quickfix_for_unknown_column_diagnostic() {
880        use crate::lsp::code_action::QuickFixData;
881        use tower_lsp::lsp_types::{
882            CodeActionContext, CodeActionKind, CodeActionOrCommand, CodeActionParams, Diagnostic,
883            DiagnosticSeverity, PartialResultParams, Position, Range, TextDocumentIdentifier,
884            WorkDoneProgressParams,
885        };
886
887        let (client, _service) = capture_client_with_root(std::env::temp_dir());
888        let backend = Backend::new_with_root(client, std::env::temp_dir());
889
890        let uri = Url::from_file_path(std::env::temp_dir().join("qf.sql")).unwrap();
891        let qf = QuickFixData {
892            suggestions: vec![code_action::SuggestionData {
893                label: "did you mean `customer_name`?".to_string(),
894                alternatives: vec![code_action::ReplacementData {
895                    range: Range::new(Position::new(0, 7), Position::new(0, 20)),
896                    new_text: "customer_name".to_string(),
897                }],
898            }],
899        };
900        let diag = Diagnostic {
901            range: Range::new(Position::new(0, 7), Position::new(0, 20)),
902            severity: Some(DiagnosticSeverity::ERROR),
903            source: Some("mz-deploy".to_string()),
904            message: "column custoser_name does not exist".to_string(),
905            data: Some(serde_json::to_value(qf).unwrap()),
906            ..Default::default()
907        };
908
909        let params = CodeActionParams {
910            text_document: TextDocumentIdentifier { uri: uri.clone() },
911            range: diag.range,
912            context: CodeActionContext {
913                diagnostics: vec![diag],
914                only: None,
915                trigger_kind: None,
916            },
917            work_done_progress_params: WorkDoneProgressParams::default(),
918            partial_result_params: PartialResultParams::default(),
919        };
920
921        let response = backend.code_action(params).await.unwrap();
922        let actions = response.expect("code_action should return Some");
923        assert_eq!(actions.len(), 1);
924        let CodeActionOrCommand::CodeAction(ca) = &actions[0] else {
925            panic!("expected CodeAction");
926        };
927        assert_eq!(ca.kind.as_ref(), Some(&CodeActionKind::QUICKFIX));
928        assert_eq!(ca.is_preferred, Some(true));
929        let edits = ca
930            .edit
931            .as_ref()
932            .unwrap()
933            .changes
934            .as_ref()
935            .unwrap()
936            .get(&uri)
937            .unwrap();
938        assert_eq!(edits[0].new_text, "customer_name");
939    }
940
941    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
942    #[mz_ore::test(tokio::test(flavor = "multi_thread", worker_threads = 2))]
943    async fn did_close_rebuilds_immediately_against_disk_state() {
944        let root = tempfile::tempdir().unwrap();
945        let models = root.path().join("models/mydb/public");
946        std::fs::create_dir_all(&models).unwrap();
947        std::fs::write(models.join("foo.sql"), "CREATE VIEW foo AS SELECT 1 AS id;").unwrap();
948        write_project_toml(root.path());
949
950        let (client, _service) = capture_client_with_root(root.path().to_path_buf());
951        let backend = Backend::new_with_root(client, root.path().to_path_buf());
952        let uri = Url::from_file_path(models.join("foo.sql")).unwrap();
953
954        backend
955            .publish_diagnostics(uri.clone(), "CREATE VIEW foo AS SELECT * FROM missing;")
956            .await;
957        backend.edit_version.fetch_add(1, Ordering::SeqCst);
958        backend.maybe_rebuild().await;
959
960        assert!(
961            backend.project_diagnostics.lock().await.contains_key(&uri),
962            "unsaved overlay should have produced project diagnostics"
963        );
964
965        backend
966            .did_close(DidCloseTextDocumentParams {
967                text_document: TextDocumentIdentifier { uri: uri.clone() },
968            })
969            .await;
970
971        assert!(!backend.documents.lock().await.contains_key(&uri));
972        assert!(!backend.parse_diagnostics.lock().await.contains_key(&uri));
973        assert!(!backend.project_diagnostics.lock().await.contains_key(&uri));
974        assert_eq!(
975            backend.edit_version.load(Ordering::SeqCst),
976            backend.rebuilt_through.load(Ordering::SeqCst),
977            "close should rebuild immediately instead of leaving stale overlay state behind"
978        );
979    }
980}