fmf_core\engine/
search.rs1use std::sync::Arc;
2
3use crate::index::{EntryId, SortKey, VolumeIndex};
4use crate::metrics::QueryTrace;
5use crate::query::{self, QueryOptions};
6
7use super::volume::{VolumeQueryCache, VolumeSlot};
8use super::{Engine, EngineError, ResultSet, VolumeState};
9
10fn query_cache_enabled() -> bool {
14 static ENABLED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
15 *ENABLED.get_or_init(|| std::env::var("FMF_QUERY_CACHE").map_or(true, |v| v != "0"))
16}
17
18impl Engine {
19 pub fn query(
40 &self,
41 text: &str,
42 opt: &QueryOptions,
43 ) -> Result<(ResultSet, QueryTrace), EngineError> {
44 let mut trace = QueryTrace {
45 query: text.to_string(),
46 ..Default::default()
47 };
48 let t_total = crate::metrics::Stage::start();
49 let mut stage = crate::metrics::Stage::start();
50
51 let cached = {
55 let cache = self.compile_cache.lock();
56 cache
57 .as_ref()
58 .and_then(|(t, o, q)| (t.as_str() == text && o == opt).then(|| Arc::clone(q)))
59 };
60 let compiled = if let Some(q) = cached {
61 trace.parse_us = stage.lap();
62 q
63 } else if opt.regex_mode {
64 trace.parse_us = stage.lap();
67 let q = Arc::new(query::compile_whole_regex(text, opt.case, opt.regex_scope)?);
68 *self.compile_cache.lock() = Some((text.to_string(), *opt, Arc::clone(&q)));
69 q
70 } else {
71 let ast = query::parse(text)?;
72 trace.parse_us = stage.lap();
73 let q = Arc::new(query::compile(&ast, opt.case, &date_resolver())?);
74 *self.compile_cache.lock() = Some((text.to_string(), *opt, Arc::clone(&q)));
75 q
76 };
77 trace.driver = compiled.driver_label();
78 trace.compile_us = stage.lap();
79
80 let slots: Vec<Arc<VolumeSlot>> = self
81 .volumes
82 .read()
83 .iter()
84 .filter(|s| *s.phase.lock() == VolumeState::Ready)
85 .cloned()
86 .collect();
87
88 let mut per_volume: Vec<(Arc<VolumeSlot>, Arc<[EntryId]>, u64)> = Vec::new();
89 let mut refined = 0usize;
90 let mut all_unchanged = true;
91 for slot in &slots {
92 let guard = slot.index.read();
93 let Some(idx) = guard.as_ref() else { continue };
94 let mut cache = slot.last_query.lock();
95 let prev_ids = if query_cache_enabled() {
96 cache.as_ref().and_then(|c| {
97 (c.content_generation == idx.content_generation()
98 && c.structural_generation == idx.structural_generation()
99 && query::subsumes(&c.compiled, &c.opt, &compiled, opt))
100 .then(|| c.ids.clone())
101 })
102 } else {
103 None
104 };
105 let (r, m) = match &prev_ids {
106 Some(ids) => {
107 refined += 1;
108 query::refine(idx, &compiled, opt, ids)
109 }
110 None => query::search(idx, &compiled, opt),
111 };
112 trace.memo_us += m.memo_us;
113 trace.scan_us += m.scan_us;
114 trace.materialize_us += m.materialize_us;
115 trace.entries_scanned += m.entries_scanned;
116 trace.excluded_skipped += m.excluded_skipped;
117 let ids: Arc<[EntryId]> = Arc::from(r.ids);
118 all_unchanged &= cache
122 .as_ref()
123 .is_some_and(|c| c.text == text && c.opt == *opt && c.ids[..] == ids[..]);
124 *cache = Some(VolumeQueryCache {
125 text: text.to_string(),
126 compiled: compiled.clone(),
127 opt: *opt,
128 content_generation: r.content_generation,
129 structural_generation: r.structural_generation,
130 ids: ids.clone(),
131 });
132 drop(cache);
133 per_volume.push((slot.clone(), ids, r.structural_generation));
134 }
135 trace.volumes = per_volume.len() as u32;
136 trace.unchanged = all_unchanged && !per_volume.is_empty();
137 trace.cache = if refined == 0 {
138 "miss"
139 } else if refined == per_volume.len() {
140 "refine"
141 } else {
142 "partial"
143 }
144 .to_string();
145 stage.lap();
146
147 let total: usize = per_volume.iter().map(|(_, ids, _)| ids.len()).sum();
151 let mut rows: Vec<(u32, EntryId)> = Vec::with_capacity(total);
152 if let [(_, ids, _)] = per_volume.as_slice() {
153 rows.extend(ids.iter().map(|&id| (0u32, id)));
154 } else {
155 let guards: Vec<_> = per_volume
156 .iter()
157 .map(|(slot, _, _)| slot.index.read())
158 .collect();
159 let mut cursors: Vec<usize> = vec![0; per_volume.len()];
160 loop {
161 let mut best: Option<usize> = None;
162 for (v, (_, ids, _)) in per_volume.iter().enumerate() {
163 if cursors[v] >= ids.len() {
164 continue;
165 }
166 best = match best {
167 None => Some(v),
168 Some(b) => {
169 let (ib, vb) = (per_volume[b].1[cursors[b]], b);
170 let (iv, vv) = (ids[cursors[v]], v);
171 let idx_b = guards[vb].as_ref().expect("active volume guard held");
172 let idx_v = guards[vv].as_ref().expect("active volume guard held");
173 if cmp_entries(idx_v, iv, idx_b, ib, opt) == std::cmp::Ordering::Less {
174 Some(vv)
175 } else {
176 Some(vb)
177 }
178 }
179 };
180 }
181 match best {
182 Some(v) => {
183 rows.push((v as u32, per_volume[v].1[cursors[v]]));
184 cursors[v] += 1;
185 }
186 None => break,
187 }
188 }
189 }
190
191 trace.merge_us = stage.lap();
192 trace.hits = rows.len() as u64;
193 trace.total_us = t_total.elapsed_us();
194 self.metrics.record_query(trace.clone());
195
196 Ok((
197 ResultSet {
198 slots: per_volume.iter().map(|(s, _, _)| s.clone()).collect(),
199 structural: per_volume.iter().map(|(_, _, g)| *g).collect(),
200 rows,
201 },
202 trace,
203 ))
204 }
205}
206
207fn cmp_entries(
208 a_idx: &VolumeIndex,
209 a: EntryId,
210 b_idx: &VolumeIndex,
211 b: EntryId,
212 opt: &QueryOptions,
213) -> std::cmp::Ordering {
214 let ord = match opt.sort {
215 SortKey::Name => a_idx.lower_name(a).cmp(b_idx.lower_name(b)),
216 SortKey::Size => a_idx.size(a).cmp(&b_idx.size(b)),
217 SortKey::Mtime => a_idx.mtime(a).cmp(&b_idx.mtime(b)),
218 };
219 if opt.desc { ord.reverse() } else { ord }
220}
221
222#[cfg(windows)]
223fn date_resolver() -> impl query::DateResolver {
224 query::WindowsLocalResolver
225}
226#[cfg(not(windows))]
227fn date_resolver() -> impl query::DateResolver {
228 query::UtcResolver
229}