Skip to main content

fmf_core\engine/
search.rs

1use std::sync::Arc;
2
3use crate::index::{EntryId, SortKey, VolumeIndex};
4use crate::metrics::QueryTrace;
5use crate::query::{self, QueryOptions};
6
7use super::volume::{VolumeQueryCache, VolumeSlot};
8use super::{Engine, EngineError, ResultSet, VolumeState};
9
10/// Kill switch for the incremental query cache (`FMF_QUERY_CACHE=0`) — if a
11/// subsumption bug ever surfaces in the field, users get correctness back
12/// without a rebuild.
13fn query_cache_enabled() -> bool {
14    static ENABLED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
15    *ENABLED.get_or_init(|| std::env::var("FMF_QUERY_CACHE").map_or(true, |v| v != "0"))
16}
17
18impl Engine {
19    /// Run a query against every Ready volume and merge the per-volume,
20    /// already-sorted id lists into one ordered result set.
21    ///
22    /// Per volume, the previous result is kept (`VolumeSlot::last_query`);
23    /// when the new query provably narrows it and the index generation is
24    /// unchanged, the candidate set is the previous hits instead of the
25    /// whole index (`query::refine`) — typing one more letter costs
26    /// O(previous hits), not O(index).
27    ///
28    /// # Errors
29    ///
30    /// Returns [`EngineError::Parse`] if `text` is not a valid query, or
31    /// [`EngineError::Compile`] if a valid query fails to compile (e.g. a bad
32    /// regex term).
33    ///
34    /// # Panics
35    ///
36    /// Panics if a `Ready` volume's index is absent during the k-way merge —
37    /// an invariant the volume thread upholds (a Ready slot always holds an
38    /// index).
39    pub fn query(
40        &self,
41        text: &str,
42        opt: &QueryOptions,
43    ) -> Result<(ResultSet, QueryTrace), EngineError> {
44        let mut trace = QueryTrace {
45            query: text.to_string(),
46            ..Default::default()
47        };
48        let t_total = crate::metrics::Stage::start();
49        let mut stage = crate::metrics::Stage::start();
50
51        // Reuse the previous compile when the same (text, case) is re-issued
52        // (USN-driven requery / RefreshInPlace) — parse + compile are skipped,
53        // which is the biggest single cost for a regex query.
54        let cached = {
55            let cache = self.compile_cache.lock();
56            cache
57                .as_ref()
58                .and_then(|(t, o, q)| (t.as_str() == text && o == opt).then(|| Arc::clone(q)))
59        };
60        let compiled = if let Some(q) = cached {
61            trace.parse_us = stage.lap();
62            q
63        } else if opt.regex_mode {
64            // Whole-query regex: the entire text is one pattern — no parse,
65            // no operators (ADR-0023).
66            trace.parse_us = stage.lap();
67            let q = Arc::new(query::compile_whole_regex(text, opt.case, opt.regex_scope)?);
68            *self.compile_cache.lock() = Some((text.to_string(), *opt, Arc::clone(&q)));
69            q
70        } else {
71            let ast = query::parse(text)?;
72            trace.parse_us = stage.lap();
73            let q = Arc::new(query::compile(&ast, opt.case, &date_resolver())?);
74            *self.compile_cache.lock() = Some((text.to_string(), *opt, Arc::clone(&q)));
75            q
76        };
77        trace.driver = compiled.driver_label();
78        trace.compile_us = stage.lap();
79
80        let slots: Vec<Arc<VolumeSlot>> = self
81            .volumes
82            .read()
83            .iter()
84            .filter(|s| *s.phase.lock() == VolumeState::Ready)
85            .cloned()
86            .collect();
87
88        let mut per_volume: Vec<(Arc<VolumeSlot>, Arc<[EntryId]>, u64)> = Vec::new();
89        let mut refined = 0usize;
90        let mut all_unchanged = true;
91        for slot in &slots {
92            let guard = slot.index.read();
93            let Some(idx) = guard.as_ref() else { continue };
94            let mut cache = slot.last_query.lock();
95            let prev_ids = if query_cache_enabled() {
96                cache.as_ref().and_then(|c| {
97                    (c.content_generation == idx.content_generation()
98                        && c.structural_generation == idx.structural_generation()
99                        && query::subsumes(&c.compiled, &c.opt, &compiled, opt))
100                    .then(|| c.ids.clone())
101                })
102            } else {
103                None
104            };
105            let (r, m) = match &prev_ids {
106                Some(ids) => {
107                    refined += 1;
108                    query::refine(idx, &compiled, opt, ids)
109                }
110                None => query::search(idx, &compiled, opt),
111            };
112            trace.memo_us += m.memo_us;
113            trace.scan_us += m.scan_us;
114            trace.materialize_us += m.materialize_us;
115            trace.entries_scanned += m.entries_scanned;
116            trace.excluded_skipped += m.excluded_skipped;
117            let ids: Arc<[EntryId]> = Arc::from(r.ids);
118            // Same query text+options re-issued (USN-driven requery) with an
119            // identical id list → the screen has nothing to change. Vec
120            // equality is a memcmp and only runs when text+opt match.
121            all_unchanged &= cache
122                .as_ref()
123                .is_some_and(|c| c.text == text && c.opt == *opt && c.ids[..] == ids[..]);
124            *cache = Some(VolumeQueryCache {
125                text: text.to_string(),
126                compiled: compiled.clone(),
127                opt: *opt,
128                content_generation: r.content_generation,
129                structural_generation: r.structural_generation,
130                ids: ids.clone(),
131            });
132            drop(cache);
133            per_volume.push((slot.clone(), ids, r.structural_generation));
134        }
135        trace.volumes = per_volume.len() as u32;
136        trace.unchanged = all_unchanged && !per_volume.is_empty();
137        trace.cache = if refined == 0 {
138            "miss"
139        } else if refined == per_volume.len() {
140            "refine"
141        } else {
142            "partial"
143        }
144        .to_string();
145        stage.lap();
146
147        // K-way merge by the sort key (typically 1-3 volumes). One volume —
148        // the common setup — is a straight copy; the cursor merge below
149        // costs more than the whole scan for large result sets.
150        let total: usize = per_volume.iter().map(|(_, ids, _)| ids.len()).sum();
151        let mut rows: Vec<(u32, EntryId)> = Vec::with_capacity(total);
152        if let [(_, ids, _)] = per_volume.as_slice() {
153            rows.extend(ids.iter().map(|&id| (0u32, id)));
154        } else {
155            let guards: Vec<_> = per_volume
156                .iter()
157                .map(|(slot, _, _)| slot.index.read())
158                .collect();
159            let mut cursors: Vec<usize> = vec![0; per_volume.len()];
160            loop {
161                let mut best: Option<usize> = None;
162                for (v, (_, ids, _)) in per_volume.iter().enumerate() {
163                    if cursors[v] >= ids.len() {
164                        continue;
165                    }
166                    best = match best {
167                        None => Some(v),
168                        Some(b) => {
169                            let (ib, vb) = (per_volume[b].1[cursors[b]], b);
170                            let (iv, vv) = (ids[cursors[v]], v);
171                            let idx_b = guards[vb].as_ref().expect("active volume guard held");
172                            let idx_v = guards[vv].as_ref().expect("active volume guard held");
173                            if cmp_entries(idx_v, iv, idx_b, ib, opt) == std::cmp::Ordering::Less {
174                                Some(vv)
175                            } else {
176                                Some(vb)
177                            }
178                        }
179                    };
180                }
181                match best {
182                    Some(v) => {
183                        rows.push((v as u32, per_volume[v].1[cursors[v]]));
184                        cursors[v] += 1;
185                    }
186                    None => break,
187                }
188            }
189        }
190
191        trace.merge_us = stage.lap();
192        trace.hits = rows.len() as u64;
193        trace.total_us = t_total.elapsed_us();
194        self.metrics.record_query(trace.clone());
195
196        Ok((
197            ResultSet {
198                slots: per_volume.iter().map(|(s, _, _)| s.clone()).collect(),
199                structural: per_volume.iter().map(|(_, _, g)| *g).collect(),
200                rows,
201            },
202            trace,
203        ))
204    }
205}
206
207fn cmp_entries(
208    a_idx: &VolumeIndex,
209    a: EntryId,
210    b_idx: &VolumeIndex,
211    b: EntryId,
212    opt: &QueryOptions,
213) -> std::cmp::Ordering {
214    let ord = match opt.sort {
215        SortKey::Name => a_idx.lower_name(a).cmp(b_idx.lower_name(b)),
216        SortKey::Size => a_idx.size(a).cmp(&b_idx.size(b)),
217        SortKey::Mtime => a_idx.mtime(a).cmp(&b_idx.mtime(b)),
218    };
219    if opt.desc { ord.reverse() } else { ord }
220}
221
222#[cfg(windows)]
223fn date_resolver() -> impl query::DateResolver {
224    query::WindowsLocalResolver
225}
226#[cfg(not(windows))]
227fn date_resolver() -> impl query::DateResolver {
228    query::UtcResolver
229}