fmf_core\engine/mod.rs
1//! Multi-volume engine assembly.
2//!
3//! Owns one `VolumeIndex` per NTFS volume, drives initial scans and USN
4//! tailing threads, and answers queries with a k-way-merged, sort-ordered
5//! result set (docs/ARCHITECTURE.md). This is the layer the FFI exposes 1:1
6//! — and the layer a v2 service would host.
7
8mod results;
9mod seams;
10mod search;
11mod volume;
12#[cfg(windows)]
13mod watch;
14#[cfg(windows)]
15mod worker;
16
17#[cfg(test)]
18mod tests;
19#[cfg(all(test, windows))]
20mod worker_tests;
21
22pub use results::{ResultSet, Row};
23
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, Ordering};
27
28use parking_lot::{Mutex, RwLock};
29use thiserror::Error;
30
31use crate::index::VolumeIndex;
32use crate::metrics::MetricsHub;
33use crate::query;
34
35use volume::{JournalCheckpoint, VolumeSlot};
36
37/// Engine startup configuration.
38#[derive(Debug, Clone)]
39pub struct EngineConfig {
40 /// Root directory holding per-volume snapshots and the `.writer.lock`
41 /// (`%ProgramData%\find-my-files\`).
42 pub index_dir: PathBuf,
43}
44
45// The volume state is contract surface (FmfVolumeStatus.state /
46// VolumeStatusWire.state carry it as u32) — the engine uses the canonical
47// definition directly, so no wire↔engine mapping exists (ADR-0018).
48pub use fmf_contract::options::VolumeState;
49
50/// Asynchronous notification a volume emits to the event sink during scanning
51/// and tailing (mapped 1:1 to a contract POD by [`EngineEvent::to_wire`]).
52#[derive(Debug, Clone)]
53pub enum EngineEvent {
54 /// Initial-scan progress: `entries` files seen so far on `volume`.
55 Progress {
56 /// Volume label (e.g. `"C:"`).
57 volume: String,
58 /// Files indexed so far (running count).
59 entries: u64,
60 },
61 /// `volume`'s initial scan finished; it is now queryable with `entries`
62 /// total files.
63 VolumeReady {
64 /// Volume label (e.g. `"C:"`).
65 volume: String,
66 /// Total files indexed when the scan completed (count).
67 entries: u64,
68 },
69 /// Emitted (debounced, engine-side only throttle) after USN batches.
70 IndexChanged {
71 /// Volume label (e.g. `"C:"`).
72 volume: String,
73 },
74 /// A full rescan of `volume` has begun (e.g. the USN journal was lost).
75 RescanStarted {
76 /// Volume label (e.g. `"C:"`).
77 volume: String,
78 },
79 /// `volume` could not be opened or scanned; `message` is the human-readable
80 /// reason.
81 VolumeFailed {
82 /// Volume label (e.g. `"C:"`).
83 volume: String,
84 /// Human-readable failure reason.
85 message: String,
86 },
87 /// A WARN/ERROR/panic was recorded in the diagnostics ring; the UI pulls
88 /// details from the metrics snapshot (push notification + pull detail).
89 EngineError {
90 /// Severity recorded in the diagnostics ring (1=warn, 2=error,
91 /// 3=panic).
92 severity: u64, // 1=warn 2=error 3=panic
93 /// Volume label the diagnostic was attributed to (empty if none).
94 volume: String,
95 },
96}
97
98impl EngineEvent {
99 /// The single `EngineEvent` → contract POD mapping — the FFI callback and
100 /// the pipe event push both consume this (ADR-0018: no per-boundary
101 /// kind tables).
102 #[must_use]
103 pub fn to_wire(&self) -> fmf_contract::pod::FmfEvent {
104 use fmf_contract::events::EventKind;
105 let (kind, volume, entries) = match self {
106 Self::Progress { volume, entries } => (EventKind::Progress, volume, *entries),
107 Self::VolumeReady { volume, entries } => (EventKind::VolumeReady, volume, *entries),
108 Self::IndexChanged { volume } => (EventKind::IndexChanged, volume, 0),
109 Self::RescanStarted { volume } => (EventKind::RescanStarted, volume, 0),
110 Self::VolumeFailed { volume, .. } => (EventKind::VolumeFailed, volume, 0),
111 Self::EngineError { severity, volume } => (EventKind::EngineError, volume, *severity),
112 };
113 fmf_contract::pod::FmfEvent::new(kind as u32, entries, volume)
114 }
115}
116
117/// Callback the engine invokes (from any thread) to deliver an [`EngineEvent`].
118pub type EventSink = Arc<dyn Fn(&EngineEvent) + Send + Sync>;
119
120/// A failure answering a query (parse, compile, or a stale result set).
121#[derive(Debug, Error)]
122pub enum EngineError {
123 /// The query text could not be parsed.
124 #[error("query parse: {0}")]
125 Parse(#[from] query::ParseError),
126 /// The parsed query could not be compiled.
127 #[error("query compile: {0}")]
128 Compile(#[from] query::CompileError),
129 /// The result set references an index that has since been rebuilt.
130 #[error("result is stale (index was rebuilt)")]
131 Stale,
132}
133
134/// Why `Engine::new` refused to start. `Locked` is the cross-process arm of
135/// the single-writer invariant (FFI: `FMF_E_LOCKED`, docs/ARCHITECTURE.md
136/// Pipe protocol §single-writer exclusion).
137#[derive(Debug, Error)]
138pub enum EngineCreateError {
139 #[error(
140 "index directory is locked by another engine process (holder pid: {})",
141 .0.map_or_else(|| "unknown".to_string(), |p| p.to_string())
142 )]
143 /// Another engine process already holds the writer lock (its pid if
144 /// readable). The cross-process arm of the single-writer invariant.
145 Locked(Option<u32>),
146 /// The index directory could not be created or its lock could not be
147 /// opened.
148 #[error("index directory: {0}")]
149 Io(#[from] std::io::Error),
150}
151
152/// The multi-volume engine: owns one index per NTFS volume, drives scans and
153/// USN tailing, and answers queries. Holds the single-writer lock for its
154/// whole lifetime.
155pub struct Engine {
156 config: EngineConfig,
157 sink: RwLock<Option<EventSink>>,
158 volumes: RwLock<Vec<Arc<VolumeSlot>>>,
159 threads: Mutex<Vec<std::thread::JoinHandle<()>>>,
160 metrics: MetricsHub,
161 /// Last `(text, options) → compiled query`. An identical re-issue — a USN-
162 /// driven requery of the same text (the `RefreshInPlace` path) — then skips
163 /// parse + compile, which matters most for a heavy regex. Always sound:
164 /// the compiled query is a pure function of `(text, options)` (the date
165 /// resolver maps civil dates to ticks independently of the wall clock).
166 /// Keying on the whole `QueryOptions` over-approximates (only case + the
167 /// regex mode/scope actually steer compilation) but stays trivially
168 /// correct. Engine-wide because compilation is volume-independent.
169 compile_cache: Mutex<Option<(String, query::QueryOptions, Arc<query::CompiledQuery>)>>,
170 /// Keeps the diag→EngineError forwarding registered for our lifetime.
171 _diag_guard: Mutex<Option<crate::diag::SinkGuard>>,
172 /// Exclusive-write handle on `{index_dir}\.writer.lock` for our whole
173 /// lifetime — the OS releases it on process death, so no stale locks.
174 #[cfg(windows)]
175 _writer_lock: std::fs::File,
176}
177
178impl Engine {
179 /// Create the engine and acquire the single-writer lock on the index
180 /// directory.
181 ///
182 /// # Errors
183 ///
184 /// Returns [`EngineCreateError::Io`] if the index directory cannot be
185 /// created, or [`EngineCreateError::Locked`] if another engine process
186 /// already holds the writer lock (`FMF_E_LOCKED`).
187 pub fn new(config: EngineConfig) -> Result<Arc<Self>, EngineCreateError> {
188 std::fs::create_dir_all(&config.index_dir)?;
189 #[cfg(windows)]
190 let writer_lock = Self::acquire_writer_lock(&config.index_dir)?;
191 let engine = Arc::new(Self {
192 config,
193 sink: RwLock::new(None),
194 volumes: RwLock::new(Vec::new()),
195 threads: Mutex::new(Vec::new()),
196 metrics: MetricsHub::new(),
197 compile_cache: Mutex::new(None),
198 _diag_guard: Mutex::new(None),
199 #[cfg(windows)]
200 _writer_lock: writer_lock,
201 });
202 // Forward every diagnostics event (WARN+/panic, any thread) to the
203 // event sink as a POD EngineError — the UI fetches the message text
204 // from the metrics snapshot. Weak: the registry must not keep the
205 // engine alive.
206 let weak = Arc::downgrade(&engine);
207 let guard = crate::diag::register_sink(Arc::new(move |ev| {
208 if let Some(e) = weak.upgrade() {
209 e.emit(EngineEvent::EngineError {
210 severity: ev.severity.as_u64(),
211 volume: ev.volume.clone().unwrap_or_default(),
212 });
213 }
214 }));
215 *engine._diag_guard.lock() = Some(guard);
216 Ok(engine)
217 }
218
219 /// Cross-process single-writer guard: exclusive write access on
220 /// `.writer.lock` (readers allowed, so a losing process can report the
221 /// holder's pid). Held until drop; the OS frees it on process death.
222 #[cfg(windows)]
223 fn acquire_writer_lock(
224 index_dir: &std::path::Path,
225 ) -> Result<std::fs::File, EngineCreateError> {
226 use std::io::Write;
227 use std::os::windows::fs::OpenOptionsExt;
228 const FILE_SHARE_READ: u32 = 0x1;
229 const ERROR_SHARING_VIOLATION: i32 = 32;
230
231 let path = index_dir.join(".writer.lock");
232 match std::fs::OpenOptions::new()
233 .write(true)
234 .create(true)
235 .truncate(true)
236 .share_mode(FILE_SHARE_READ)
237 .open(&path)
238 {
239 Ok(mut f) => {
240 // Best effort — the pid is diagnostics for the loser, not state.
241 let _ = write!(f, "{}", std::process::id());
242 let _ = f.flush();
243 Ok(f)
244 }
245 Err(e) if e.raw_os_error() == Some(ERROR_SHARING_VIOLATION) => {
246 let holder = std::fs::read_to_string(&path)
247 .ok()
248 .and_then(|s| s.trim().parse::<u32>().ok());
249 Err(EngineCreateError::Locked(holder))
250 }
251 Err(e) => Err(EngineCreateError::Io(e)),
252 }
253 }
254
255 /// Install (or clear with `None`) the callback that receives every
256 /// [`EngineEvent`].
257 pub fn set_event_sink(&self, sink: Option<EventSink>) {
258 *self.sink.write() = sink;
259 }
260
261 fn emit(&self, ev: EngineEvent) {
262 if let Some(s) = self.sink.read().clone() {
263 s(&ev);
264 }
265 }
266
267 /// Begin indexing the given volumes (asynchronous; progress via events).
268 /// Idempotent per volume label: clients re-send `IndexStart` on every
269 /// (re)connect and the service also calls this at startup, so a volume
270 /// already being indexed is skipped. A duplicate slot would make every
271 /// query return that volume's rows once per copy (search merges all Ready
272 /// slots) — the source of the "each result appears N times" bug.
273 ///
274 /// # Panics
275 ///
276 /// Panics if a volume worker thread cannot be spawned.
277 pub fn index_start(self: &Arc<Self>, volumes: &[String]) {
278 for label in volumes {
279 // Trust boundary: `volumes` reaches us unvalidated — over the pipe
280 // (IndexStart op), through the FFI, and from service startup. A
281 // label must be exactly "<letter>:"; reject anything else here, the
282 // one chokepoint every caller funnels through, so a hostile request
283 // can neither spawn unbounded volume threads nor steer
284 // `snapshot_path` outside the index dir with a `..\` label. Report
285 // it as VolumeFailed — the same way the worker surfaces a volume it
286 // can't open — so the client still gets feedback (don't go silent) and we
287 // never reach the slot/thread/`snapshot_path` path with garbage.
288 if !volume::is_valid_volume_label(label) {
289 tracing::warn!(label = %label, "index_start: rejecting malformed volume label");
290 self.emit(EngineEvent::VolumeFailed {
291 volume: label.clone(),
292 message: "malformed volume label (expected \"<letter>:\")".to_string(),
293 });
294 continue;
295 }
296 // Decide-and-insert under one write lock so a concurrent
297 // index_start of the same label can't slip a second slot in.
298 let slot = {
299 let mut vols = self.volumes.write();
300 if vols.iter().any(|s| s.label == *label) {
301 continue;
302 }
303 let store = Arc::new(seams::WinSnapshotStore::new(volume::snapshot_path(
304 &self.config.index_dir,
305 label,
306 )));
307 let slot = Arc::new(VolumeSlot::scanning(label.clone(), store));
308 vols.push(slot.clone());
309 slot
310 };
311 let engine = self.clone();
312 let handle = std::thread::Builder::new()
313 .name(format!("fmf-vol-{label}"))
314 .spawn(move || engine.volume_thread(slot))
315 .expect("spawn volume thread");
316 self.threads.lock().push(handle);
317 }
318 }
319
320 /// Begin a non-elevated **scope-mode** index over `roots` (absolute base
321 /// paths), folder-walked and watched in-process without elevation
322 /// (ADR-0024). Unlike [`Self::index_start`], the change source is
323 /// `ReadDirectoryChangesW`, not the USN journal, and the snapshot lives
324 /// under a single fixed `scope` label (so a hostile `roots` entry can
325 /// never steer `snapshot_path` — only the fixed label does). Idempotent:
326 /// a second call while the scope slot exists is a no-op.
327 ///
328 /// # Panics
329 ///
330 /// Panics if the volume worker thread cannot be spawned.
331 #[cfg(windows)]
332 pub fn index_start_scope(self: &Arc<Self>, roots: &[String], excludes: &[String]) {
333 const SCOPE_LABEL: &str = "scope";
334 let roots: Vec<String> = roots
335 .iter()
336 .map(|r| r.trim().to_string())
337 .filter(|r| !r.is_empty())
338 .collect();
339 let excludes: Vec<String> = excludes
340 .iter()
341 .map(|r| r.trim().to_string())
342 .filter(|r| !r.is_empty())
343 .collect();
344 if roots.is_empty() {
345 tracing::warn!("index_start_scope: no roots configured");
346 self.emit(EngineEvent::VolumeFailed {
347 volume: SCOPE_LABEL.to_string(),
348 message: "スコープ索引のフォルダが未設定です".to_string(),
349 });
350 return;
351 }
352 let slot = {
353 let mut vols = self.volumes.write();
354 if vols.iter().any(|s| s.label == SCOPE_LABEL) {
355 return;
356 }
357 let store = Arc::new(seams::WinSnapshotStore::new(volume::snapshot_path(
358 &self.config.index_dir,
359 SCOPE_LABEL,
360 )));
361 let slot = Arc::new(VolumeSlot::scanning_walk(
362 SCOPE_LABEL.to_string(),
363 store,
364 roots,
365 excludes,
366 ));
367 vols.push(slot.clone());
368 slot
369 };
370 let engine = self.clone();
371 let handle = std::thread::Builder::new()
372 .name(format!("fmf-vol-{SCOPE_LABEL}"))
373 .spawn(move || engine.volume_thread(slot))
374 .expect("spawn volume thread");
375 self.threads.lock().push(handle);
376 }
377
378 /// Per-volume status: `(label, state, files scanned so far)`.
379 pub fn status(&self) -> Vec<(String, VolumeState, u64)> {
380 self.volumes
381 .read()
382 .iter()
383 .map(|s| (s.label.clone(), *s.phase.lock(), *s.scanned.lock()))
384 .collect()
385 }
386
387 /// The engine's metrics hub (counters and the diagnostics ring).
388 pub const fn metrics(&self) -> &MetricsHub {
389 &self.metrics
390 }
391
392 /// Per-volume memory accounting (perf panel / `fmf stats`).
393 pub fn index_stats(&self) -> Vec<crate::metrics::IndexStats> {
394 self.volumes
395 .read()
396 .iter()
397 .filter_map(|slot| {
398 slot.index.read().as_ref().map(|idx| {
399 let mut s = idx.stats(&slot.label);
400 s.add_derived_bytes(query::derived_cache_bytes(idx));
401 s
402 })
403 })
404 .collect()
405 }
406
407 /// Full observability snapshot (JSON-serializable).
408 pub fn metrics_snapshot(&self) -> crate::metrics::MetricsSnapshot {
409 self.metrics.snapshot(64, self.index_stats())
410 }
411
412 /// Persist every Ready volume whose generations moved since its last
413 /// save ("dirty"), using the tailing thread's shared checkpoint. The
414 /// checkpoint may trail the index by an in-flight batch — the USN
415 /// replay on load covers that. Returns the number of snapshots written
416 /// (failed writes are counted in `snapshot_save_failures` and excluded).
417 #[cfg(windows)]
418 pub fn flush(&self) -> usize {
419 let volumes: Vec<_> = self.volumes.read().clone();
420 let mut saved = 0;
421 for slot in volumes {
422 if *slot.phase.lock() != VolumeState::Ready {
423 continue;
424 }
425 // Checkpoint before index: a batch landing in between leaves the
426 // index newer than the checkpoint, never older.
427 let Some(ckpt) = *slot.checkpoint.lock() else {
428 continue;
429 };
430 let dirty = {
431 let guard = slot.index.read();
432 let Some(idx) = guard.as_ref() else { continue };
433 *slot.last_saved.lock()
434 != Some((idx.content_generation(), idx.structural_generation()))
435 };
436 if !dirty {
437 continue;
438 }
439 if self.save_slot(&slot, ckpt) {
440 saved += 1;
441 }
442 }
443 saved
444 }
445
446 /// Signal every volume thread to stop and join them (bounded wait).
447 pub fn shutdown(&self) {
448 // Close the diag→EngineError forwarding window first: shutdown-time
449 // WARNs (final flush, journal teardown) still reach the log and the
450 // diag ring, but no longer race the dying event sink.
451 *self._diag_guard.lock() = None;
452 for slot in self.volumes.read().iter() {
453 slot.stop.store(true, Ordering::Relaxed);
454 }
455 // Blocked journal reads return on the next volume write; joining with
456 // a bounded wait keeps shutdown prompt without CancelSynchronousIo
457 // (M2 refinement).
458 let mut threads = self.threads.lock();
459 for t in threads.drain(..) {
460 let _ = t.join();
461 }
462 }
463
464 /// Test/dev helper: register an already-built index as a Ready volume.
465 /// The zero checkpoint stands in for a journal position so `flush` can
466 /// exercise the save path on injected volumes.
467 pub fn insert_ready_volume(&self, label: &str, idx: VolumeIndex) {
468 let slot = Arc::new(VolumeSlot {
469 label: label.to_string(),
470 phase: Mutex::new(VolumeState::Ready),
471 scanned: Mutex::new(idx.live_len() as u64),
472 index: RwLock::new(Some(idx)),
473 stop: Arc::new(AtomicBool::new(false)),
474 last_query: Mutex::new(None),
475 checkpoint: Mutex::new(Some(JournalCheckpoint {
476 journal_id: 0,
477 next_usn: 0,
478 })),
479 last_saved: Mutex::new(None),
480 save_lock: Mutex::new(()),
481 store: Arc::new(seams::WinSnapshotStore::new(volume::snapshot_path(
482 &self.config.index_dir,
483 label,
484 ))),
485 kind: volume::WorkerKind::Mft,
486 });
487 self.volumes.write().push(slot);
488 }
489
490 /// Test/dev helper: swap a rebuilt index into an existing Ready volume —
491 /// the same structural replacement a journal-gone full rescan performs.
492 ///
493 /// # Panics
494 ///
495 /// Panics if no volume with the given `label` exists.
496 pub fn replace_ready_volume(&self, label: &str, idx: VolumeIndex) {
497 let volumes = self.volumes.read();
498 let slot = volumes
499 .iter()
500 .find(|s| s.label == label)
501 .expect("replace_ready_volume: unknown volume");
502 *slot.scanned.lock() = idx.live_len() as u64;
503 slot.install_index(idx);
504 }
505}