Skip to main content

fmf_core\engine/
mod.rs

1//! Multi-volume engine assembly.
2//!
3//! Owns one `VolumeIndex` per NTFS volume, drives initial scans and USN
4//! tailing threads, and answers queries with a k-way-merged, sort-ordered
5//! result set (docs/ARCHITECTURE.md). This is the layer the FFI exposes 1:1
6//! — and the layer a v2 service would host.
7
8mod results;
9mod seams;
10mod search;
11mod volume;
12#[cfg(windows)]
13mod watch;
14#[cfg(windows)]
15mod worker;
16
17#[cfg(test)]
18mod tests;
19#[cfg(all(test, windows))]
20mod worker_tests;
21
22pub use results::{ResultSet, Row};
23
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, Ordering};
27
28use parking_lot::{Mutex, RwLock};
29use thiserror::Error;
30
31use crate::index::VolumeIndex;
32use crate::metrics::MetricsHub;
33use crate::query;
34
35use volume::{JournalCheckpoint, VolumeSlot};
36
37/// Engine startup configuration.
38#[derive(Debug, Clone)]
39pub struct EngineConfig {
40    /// Root directory holding per-volume snapshots and the `.writer.lock`
41    /// (`%ProgramData%\find-my-files\`).
42    pub index_dir: PathBuf,
43}
44
45// The volume state is contract surface (FmfVolumeStatus.state /
46// VolumeStatusWire.state carry it as u32) — the engine uses the canonical
47// definition directly, so no wire↔engine mapping exists (ADR-0018).
48pub use fmf_contract::options::VolumeState;
49
50/// Asynchronous notification a volume emits to the event sink during scanning
51/// and tailing (mapped 1:1 to a contract POD by [`EngineEvent::to_wire`]).
52#[derive(Debug, Clone)]
53pub enum EngineEvent {
54    /// Initial-scan progress: `entries` files seen so far on `volume`.
55    Progress {
56        /// Volume label (e.g. `"C:"`).
57        volume: String,
58        /// Files indexed so far (running count).
59        entries: u64,
60    },
61    /// `volume`'s initial scan finished; it is now queryable with `entries`
62    /// total files.
63    VolumeReady {
64        /// Volume label (e.g. `"C:"`).
65        volume: String,
66        /// Total files indexed when the scan completed (count).
67        entries: u64,
68    },
69    /// Emitted (debounced, engine-side only throttle) after USN batches.
70    IndexChanged {
71        /// Volume label (e.g. `"C:"`).
72        volume: String,
73    },
74    /// A full rescan of `volume` has begun (e.g. the USN journal was lost).
75    RescanStarted {
76        /// Volume label (e.g. `"C:"`).
77        volume: String,
78    },
79    /// `volume` could not be opened or scanned; `message` is the human-readable
80    /// reason.
81    VolumeFailed {
82        /// Volume label (e.g. `"C:"`).
83        volume: String,
84        /// Human-readable failure reason.
85        message: String,
86    },
87    /// A WARN/ERROR/panic was recorded in the diagnostics ring; the UI pulls
88    /// details from the metrics snapshot (push notification + pull detail).
89    EngineError {
90        /// Severity recorded in the diagnostics ring (1=warn, 2=error,
91        /// 3=panic).
92        severity: u64, // 1=warn 2=error 3=panic
93        /// Volume label the diagnostic was attributed to (empty if none).
94        volume: String,
95    },
96}
97
98impl EngineEvent {
99    /// The single `EngineEvent` → contract POD mapping — the FFI callback and
100    /// the pipe event push both consume this (ADR-0018: no per-boundary
101    /// kind tables).
102    #[must_use]
103    pub fn to_wire(&self) -> fmf_contract::pod::FmfEvent {
104        use fmf_contract::events::EventKind;
105        let (kind, volume, entries) = match self {
106            Self::Progress { volume, entries } => (EventKind::Progress, volume, *entries),
107            Self::VolumeReady { volume, entries } => (EventKind::VolumeReady, volume, *entries),
108            Self::IndexChanged { volume } => (EventKind::IndexChanged, volume, 0),
109            Self::RescanStarted { volume } => (EventKind::RescanStarted, volume, 0),
110            Self::VolumeFailed { volume, .. } => (EventKind::VolumeFailed, volume, 0),
111            Self::EngineError { severity, volume } => (EventKind::EngineError, volume, *severity),
112        };
113        fmf_contract::pod::FmfEvent::new(kind as u32, entries, volume)
114    }
115}
116
117/// Callback the engine invokes (from any thread) to deliver an [`EngineEvent`].
118pub type EventSink = Arc<dyn Fn(&EngineEvent) + Send + Sync>;
119
120/// A failure answering a query (parse, compile, or a stale result set).
121#[derive(Debug, Error)]
122pub enum EngineError {
123    /// The query text could not be parsed.
124    #[error("query parse: {0}")]
125    Parse(#[from] query::ParseError),
126    /// The parsed query could not be compiled.
127    #[error("query compile: {0}")]
128    Compile(#[from] query::CompileError),
129    /// The result set references an index that has since been rebuilt.
130    #[error("result is stale (index was rebuilt)")]
131    Stale,
132}
133
134/// Why `Engine::new` refused to start. `Locked` is the cross-process arm of
135/// the single-writer invariant (FFI: `FMF_E_LOCKED`, docs/ARCHITECTURE.md
136/// Pipe protocol §single-writer exclusion).
137#[derive(Debug, Error)]
138pub enum EngineCreateError {
139    #[error(
140        "index directory is locked by another engine process (holder pid: {})",
141        .0.map_or_else(|| "unknown".to_string(), |p| p.to_string())
142    )]
143    /// Another engine process already holds the writer lock (its pid if
144    /// readable). The cross-process arm of the single-writer invariant.
145    Locked(Option<u32>),
146    /// The index directory could not be created or its lock could not be
147    /// opened.
148    #[error("index directory: {0}")]
149    Io(#[from] std::io::Error),
150}
151
152/// The multi-volume engine: owns one index per NTFS volume, drives scans and
153/// USN tailing, and answers queries. Holds the single-writer lock for its
154/// whole lifetime.
155pub struct Engine {
156    config: EngineConfig,
157    sink: RwLock<Option<EventSink>>,
158    volumes: RwLock<Vec<Arc<VolumeSlot>>>,
159    threads: Mutex<Vec<std::thread::JoinHandle<()>>>,
160    metrics: MetricsHub,
161    /// Last `(text, options) → compiled query`. An identical re-issue — a USN-
162    /// driven requery of the same text (the `RefreshInPlace` path) — then skips
163    /// parse + compile, which matters most for a heavy regex. Always sound:
164    /// the compiled query is a pure function of `(text, options)` (the date
165    /// resolver maps civil dates to ticks independently of the wall clock).
166    /// Keying on the whole `QueryOptions` over-approximates (only case + the
167    /// regex mode/scope actually steer compilation) but stays trivially
168    /// correct. Engine-wide because compilation is volume-independent.
169    compile_cache: Mutex<Option<(String, query::QueryOptions, Arc<query::CompiledQuery>)>>,
170    /// Keeps the diag→EngineError forwarding registered for our lifetime.
171    _diag_guard: Mutex<Option<crate::diag::SinkGuard>>,
172    /// Exclusive-write handle on `{index_dir}\.writer.lock` for our whole
173    /// lifetime — the OS releases it on process death, so no stale locks.
174    #[cfg(windows)]
175    _writer_lock: std::fs::File,
176}
177
178impl Engine {
179    /// Create the engine and acquire the single-writer lock on the index
180    /// directory.
181    ///
182    /// # Errors
183    ///
184    /// Returns [`EngineCreateError::Io`] if the index directory cannot be
185    /// created, or [`EngineCreateError::Locked`] if another engine process
186    /// already holds the writer lock (`FMF_E_LOCKED`).
187    pub fn new(config: EngineConfig) -> Result<Arc<Self>, EngineCreateError> {
188        std::fs::create_dir_all(&config.index_dir)?;
189        #[cfg(windows)]
190        let writer_lock = Self::acquire_writer_lock(&config.index_dir)?;
191        let engine = Arc::new(Self {
192            config,
193            sink: RwLock::new(None),
194            volumes: RwLock::new(Vec::new()),
195            threads: Mutex::new(Vec::new()),
196            metrics: MetricsHub::new(),
197            compile_cache: Mutex::new(None),
198            _diag_guard: Mutex::new(None),
199            #[cfg(windows)]
200            _writer_lock: writer_lock,
201        });
202        // Forward every diagnostics event (WARN+/panic, any thread) to the
203        // event sink as a POD EngineError — the UI fetches the message text
204        // from the metrics snapshot. Weak: the registry must not keep the
205        // engine alive.
206        let weak = Arc::downgrade(&engine);
207        let guard = crate::diag::register_sink(Arc::new(move |ev| {
208            if let Some(e) = weak.upgrade() {
209                e.emit(EngineEvent::EngineError {
210                    severity: ev.severity.as_u64(),
211                    volume: ev.volume.clone().unwrap_or_default(),
212                });
213            }
214        }));
215        *engine._diag_guard.lock() = Some(guard);
216        Ok(engine)
217    }
218
219    /// Cross-process single-writer guard: exclusive write access on
220    /// `.writer.lock` (readers allowed, so a losing process can report the
221    /// holder's pid). Held until drop; the OS frees it on process death.
222    #[cfg(windows)]
223    fn acquire_writer_lock(
224        index_dir: &std::path::Path,
225    ) -> Result<std::fs::File, EngineCreateError> {
226        use std::io::Write;
227        use std::os::windows::fs::OpenOptionsExt;
228        const FILE_SHARE_READ: u32 = 0x1;
229        const ERROR_SHARING_VIOLATION: i32 = 32;
230
231        let path = index_dir.join(".writer.lock");
232        match std::fs::OpenOptions::new()
233            .write(true)
234            .create(true)
235            .truncate(true)
236            .share_mode(FILE_SHARE_READ)
237            .open(&path)
238        {
239            Ok(mut f) => {
240                // Best effort — the pid is diagnostics for the loser, not state.
241                let _ = write!(f, "{}", std::process::id());
242                let _ = f.flush();
243                Ok(f)
244            }
245            Err(e) if e.raw_os_error() == Some(ERROR_SHARING_VIOLATION) => {
246                let holder = std::fs::read_to_string(&path)
247                    .ok()
248                    .and_then(|s| s.trim().parse::<u32>().ok());
249                Err(EngineCreateError::Locked(holder))
250            }
251            Err(e) => Err(EngineCreateError::Io(e)),
252        }
253    }
254
255    /// Install (or clear with `None`) the callback that receives every
256    /// [`EngineEvent`].
257    pub fn set_event_sink(&self, sink: Option<EventSink>) {
258        *self.sink.write() = sink;
259    }
260
261    fn emit(&self, ev: EngineEvent) {
262        if let Some(s) = self.sink.read().clone() {
263            s(&ev);
264        }
265    }
266
267    /// Begin indexing the given volumes (asynchronous; progress via events).
268    /// Idempotent per volume label: clients re-send `IndexStart` on every
269    /// (re)connect and the service also calls this at startup, so a volume
270    /// already being indexed is skipped. A duplicate slot would make every
271    /// query return that volume's rows once per copy (search merges all Ready
272    /// slots) — the source of the "each result appears N times" bug.
273    ///
274    /// # Panics
275    ///
276    /// Panics if a volume worker thread cannot be spawned.
277    pub fn index_start(self: &Arc<Self>, volumes: &[String]) {
278        for label in volumes {
279            // Trust boundary: `volumes` reaches us unvalidated — over the pipe
280            // (IndexStart op), through the FFI, and from service startup. A
281            // label must be exactly "<letter>:"; reject anything else here, the
282            // one chokepoint every caller funnels through, so a hostile request
283            // can neither spawn unbounded volume threads nor steer
284            // `snapshot_path` outside the index dir with a `..\` label. Report
285            // it as VolumeFailed — the same way the worker surfaces a volume it
286            // can't open — so the client still gets feedback (don't go silent) and we
287            // never reach the slot/thread/`snapshot_path` path with garbage.
288            if !volume::is_valid_volume_label(label) {
289                tracing::warn!(label = %label, "index_start: rejecting malformed volume label");
290                self.emit(EngineEvent::VolumeFailed {
291                    volume: label.clone(),
292                    message: "malformed volume label (expected \"<letter>:\")".to_string(),
293                });
294                continue;
295            }
296            // Decide-and-insert under one write lock so a concurrent
297            // index_start of the same label can't slip a second slot in.
298            let slot = {
299                let mut vols = self.volumes.write();
300                if vols.iter().any(|s| s.label == *label) {
301                    continue;
302                }
303                let store = Arc::new(seams::WinSnapshotStore::new(volume::snapshot_path(
304                    &self.config.index_dir,
305                    label,
306                )));
307                let slot = Arc::new(VolumeSlot::scanning(label.clone(), store));
308                vols.push(slot.clone());
309                slot
310            };
311            let engine = self.clone();
312            let handle = std::thread::Builder::new()
313                .name(format!("fmf-vol-{label}"))
314                .spawn(move || engine.volume_thread(slot))
315                .expect("spawn volume thread");
316            self.threads.lock().push(handle);
317        }
318    }
319
320    /// Begin a non-elevated **scope-mode** index over `roots` (absolute base
321    /// paths), folder-walked and watched in-process without elevation
322    /// (ADR-0024). Unlike [`Self::index_start`], the change source is
323    /// `ReadDirectoryChangesW`, not the USN journal, and the snapshot lives
324    /// under a single fixed `scope` label (so a hostile `roots` entry can
325    /// never steer `snapshot_path` — only the fixed label does). Idempotent:
326    /// a second call while the scope slot exists is a no-op.
327    ///
328    /// # Panics
329    ///
330    /// Panics if the volume worker thread cannot be spawned.
331    #[cfg(windows)]
332    pub fn index_start_scope(self: &Arc<Self>, roots: &[String], excludes: &[String]) {
333        const SCOPE_LABEL: &str = "scope";
334        let roots: Vec<String> = roots
335            .iter()
336            .map(|r| r.trim().to_string())
337            .filter(|r| !r.is_empty())
338            .collect();
339        let excludes: Vec<String> = excludes
340            .iter()
341            .map(|r| r.trim().to_string())
342            .filter(|r| !r.is_empty())
343            .collect();
344        if roots.is_empty() {
345            tracing::warn!("index_start_scope: no roots configured");
346            self.emit(EngineEvent::VolumeFailed {
347                volume: SCOPE_LABEL.to_string(),
348                message: "スコープ索引のフォルダが未設定です".to_string(),
349            });
350            return;
351        }
352        let slot = {
353            let mut vols = self.volumes.write();
354            if vols.iter().any(|s| s.label == SCOPE_LABEL) {
355                return;
356            }
357            let store = Arc::new(seams::WinSnapshotStore::new(volume::snapshot_path(
358                &self.config.index_dir,
359                SCOPE_LABEL,
360            )));
361            let slot = Arc::new(VolumeSlot::scanning_walk(
362                SCOPE_LABEL.to_string(),
363                store,
364                roots,
365                excludes,
366            ));
367            vols.push(slot.clone());
368            slot
369        };
370        let engine = self.clone();
371        let handle = std::thread::Builder::new()
372            .name(format!("fmf-vol-{SCOPE_LABEL}"))
373            .spawn(move || engine.volume_thread(slot))
374            .expect("spawn volume thread");
375        self.threads.lock().push(handle);
376    }
377
378    /// Per-volume status: `(label, state, files scanned so far)`.
379    pub fn status(&self) -> Vec<(String, VolumeState, u64)> {
380        self.volumes
381            .read()
382            .iter()
383            .map(|s| (s.label.clone(), *s.phase.lock(), *s.scanned.lock()))
384            .collect()
385    }
386
387    /// The engine's metrics hub (counters and the diagnostics ring).
388    pub const fn metrics(&self) -> &MetricsHub {
389        &self.metrics
390    }
391
392    /// Per-volume memory accounting (perf panel / `fmf stats`).
393    pub fn index_stats(&self) -> Vec<crate::metrics::IndexStats> {
394        self.volumes
395            .read()
396            .iter()
397            .filter_map(|slot| {
398                slot.index.read().as_ref().map(|idx| {
399                    let mut s = idx.stats(&slot.label);
400                    s.add_derived_bytes(query::derived_cache_bytes(idx));
401                    s
402                })
403            })
404            .collect()
405    }
406
407    /// Full observability snapshot (JSON-serializable).
408    pub fn metrics_snapshot(&self) -> crate::metrics::MetricsSnapshot {
409        self.metrics.snapshot(64, self.index_stats())
410    }
411
412    /// Persist every Ready volume whose generations moved since its last
413    /// save ("dirty"), using the tailing thread's shared checkpoint. The
414    /// checkpoint may trail the index by an in-flight batch — the USN
415    /// replay on load covers that. Returns the number of snapshots written
416    /// (failed writes are counted in `snapshot_save_failures` and excluded).
417    #[cfg(windows)]
418    pub fn flush(&self) -> usize {
419        let volumes: Vec<_> = self.volumes.read().clone();
420        let mut saved = 0;
421        for slot in volumes {
422            if *slot.phase.lock() != VolumeState::Ready {
423                continue;
424            }
425            // Checkpoint before index: a batch landing in between leaves the
426            // index newer than the checkpoint, never older.
427            let Some(ckpt) = *slot.checkpoint.lock() else {
428                continue;
429            };
430            let dirty = {
431                let guard = slot.index.read();
432                let Some(idx) = guard.as_ref() else { continue };
433                *slot.last_saved.lock()
434                    != Some((idx.content_generation(), idx.structural_generation()))
435            };
436            if !dirty {
437                continue;
438            }
439            if self.save_slot(&slot, ckpt) {
440                saved += 1;
441            }
442        }
443        saved
444    }
445
446    /// Signal every volume thread to stop and join them (bounded wait).
447    pub fn shutdown(&self) {
448        // Close the diag→EngineError forwarding window first: shutdown-time
449        // WARNs (final flush, journal teardown) still reach the log and the
450        // diag ring, but no longer race the dying event sink.
451        *self._diag_guard.lock() = None;
452        for slot in self.volumes.read().iter() {
453            slot.stop.store(true, Ordering::Relaxed);
454        }
455        // Blocked journal reads return on the next volume write; joining with
456        // a bounded wait keeps shutdown prompt without CancelSynchronousIo
457        // (M2 refinement).
458        let mut threads = self.threads.lock();
459        for t in threads.drain(..) {
460            let _ = t.join();
461        }
462    }
463
464    /// Test/dev helper: register an already-built index as a Ready volume.
465    /// The zero checkpoint stands in for a journal position so `flush` can
466    /// exercise the save path on injected volumes.
467    pub fn insert_ready_volume(&self, label: &str, idx: VolumeIndex) {
468        let slot = Arc::new(VolumeSlot {
469            label: label.to_string(),
470            phase: Mutex::new(VolumeState::Ready),
471            scanned: Mutex::new(idx.live_len() as u64),
472            index: RwLock::new(Some(idx)),
473            stop: Arc::new(AtomicBool::new(false)),
474            last_query: Mutex::new(None),
475            checkpoint: Mutex::new(Some(JournalCheckpoint {
476                journal_id: 0,
477                next_usn: 0,
478            })),
479            last_saved: Mutex::new(None),
480            save_lock: Mutex::new(()),
481            store: Arc::new(seams::WinSnapshotStore::new(volume::snapshot_path(
482                &self.config.index_dir,
483                label,
484            ))),
485            kind: volume::WorkerKind::Mft,
486        });
487        self.volumes.write().push(slot);
488    }
489
490    /// Test/dev helper: swap a rebuilt index into an existing Ready volume —
491    /// the same structural replacement a journal-gone full rescan performs.
492    ///
493    /// # Panics
494    ///
495    /// Panics if no volume with the given `label` exists.
496    pub fn replace_ready_volume(&self, label: &str, idx: VolumeIndex) {
497        let volumes = self.volumes.read();
498        let slot = volumes
499            .iter()
500            .find(|s| s.label == label)
501            .expect("replace_ready_volume: unknown volume");
502        *slot.scanned.lock() = idx.live_len() as u64;
503        slot.install_index(idx);
504    }
505}