Skip to main content

fmf_core\engine/
volume.rs

1//! Per-volume state: the slot the engine and the worker thread share
2//! (`VolumeSlot`), the index install rule, the journal checkpoint, and the
3//! snapshot save helper. The thread that drives the flow lives in
4//! `worker.rs`; the OS-effect seams it runs against live in `seams.rs`.
5
6use std::sync::Arc;
7use std::sync::atomic::AtomicBool;
8
9use parking_lot::{Mutex, RwLock};
10
11use crate::index::{EntryId, VolumeIndex};
12use crate::metrics::Counters;
13use crate::query::{CompiledQuery, QueryOptions};
14
15use super::seams::SnapshotStore;
16use super::{Engine, VolumeState};
17
18/// Last materialized per-volume result, kept for incremental refinement
19/// (query/subsume.rs) and unchanged-result detection (`QueryTrace::
20/// unchanged`). Validity = both generations still match; USN batches
21/// invalidate implicitly by bumping `content_generation`.
22pub(super) struct VolumeQueryCache {
23    /// The raw query text — equality here (with `opt`) defines "the same
24    /// query" for unchanged detection; subsumption defines refinement.
25    pub(super) text: String,
26    pub(super) compiled: Arc<CompiledQuery>,
27    pub(super) opt: QueryOptions,
28    pub(super) content_generation: u64,
29    pub(super) structural_generation: u64,
30    pub(super) ids: Arc<[EntryId]>,
31}
32
33/// USN position paired with the index state, shared with `Engine::flush`.
34/// The tailing thread owns the journal handle; "save now" from another
35/// thread needs (`journal_id`, `next_usn`) without touching it. Updated *after*
36/// a batch is applied, so a concurrent flush that reads the checkpoint
37/// first always saves checkpoint ≤ index — the USN replay on load covers
38/// the gap (re-applying records is idempotent; skipping them would not be).
39#[derive(Clone, Copy)]
40pub(super) struct JournalCheckpoint {
41    pub(super) journal_id: u64,
42    pub(super) next_usn: i64,
43}
44
45/// What drives a volume slot: the privileged $MFT scan + USN journal, or the
46/// non-elevated folder-walk + `ReadDirectoryChangesW` watcher (scope mode,
47/// ADR-0024). The worker reuses one loop for both; this picks the initial-scan
48/// source and the change source at the two branch points.
49pub(super) enum WorkerKind {
50    /// `mft::scan_volume(label)` + `WinJournalSource` — needs elevation.
51    Mft,
52    /// `scan::walk::walk_scan(roots, excludes)` + `WatcherJournalSource` — no
53    /// elevation. `excludes` prunes matching subtrees at walk time (ADR-0025).
54    Walk {
55        roots: Vec<String>,
56        excludes: Vec<String>,
57    },
58}
59
60pub(super) struct VolumeSlot {
61    pub(super) label: String,
62    pub(super) phase: Mutex<VolumeState>,
63    pub(super) scanned: Mutex<u64>,
64    pub(super) index: RwLock<Option<VolumeIndex>>,
65    pub(super) stop: Arc<AtomicBool>,
66    /// Single-entry query cache (lock order: `index` read first, then this).
67    pub(super) last_query: Mutex<Option<VolumeQueryCache>>,
68    /// None until the volume is Ready (flush skips it).
69    pub(super) checkpoint: Mutex<Option<JournalCheckpoint>>,
70    /// (content, structural) generations at the last snapshot save — the
71    /// dirty check that keeps periodic flushes from rewriting unchanged
72    /// volumes.
73    pub(super) last_saved: Mutex<Option<(u64, u64)>>,
74    /// Serializes snapshot writers for this slot (flush vs. stop-save).
75    pub(super) save_lock: Mutex<()>,
76    /// Snapshot persistence seam for this volume (ADR-0018) — production
77    /// is `WinSnapshotStore` on `snapshot_path(...)`.
78    pub(super) store: Arc<dyn SnapshotStore>,
79    /// Initial-scan + change source this slot drives (ADR-0024).
80    pub(super) kind: WorkerKind,
81}
82
83impl VolumeSlot {
84    /// A privileged ($MFT + USN) slot in its initial Scanning state — the
85    /// shape `index_start` (and the worker tests) spawn workers on.
86    pub(super) fn scanning(label: String, store: Arc<dyn SnapshotStore>) -> Self {
87        Self::scanning_kind(label, store, WorkerKind::Mft)
88    }
89
90    /// A non-elevated scope-mode (folder-walk + watcher) slot (ADR-0024).
91    pub(super) fn scanning_walk(
92        label: String,
93        store: Arc<dyn SnapshotStore>,
94        roots: Vec<String>,
95        excludes: Vec<String>,
96    ) -> Self {
97        Self::scanning_kind(label, store, WorkerKind::Walk { roots, excludes })
98    }
99
100    fn scanning_kind(label: String, store: Arc<dyn SnapshotStore>, kind: WorkerKind) -> Self {
101        Self {
102            label,
103            phase: Mutex::new(VolumeState::Scanning),
104            scanned: Mutex::new(0),
105            index: RwLock::new(None),
106            stop: Arc::new(AtomicBool::new(false)),
107            last_query: Mutex::new(None),
108            checkpoint: Mutex::new(None),
109            last_saved: Mutex::new(None),
110            save_lock: Mutex::new(()),
111            store,
112            kind,
113        }
114    }
115
116    /// Install a freshly built index. Replacing an existing one is a
117    /// structural change (journal-gone full rescan): the new index inherits
118    /// the previous `structural_generation + 1` so open `ResultSet`s go
119    /// hard-stale (docs/ARCHITECTURE.md, 2-layer generation). A first install
120    /// (initial scan or snapshot restore) keeps the value the index was
121    /// built with.
122    pub(super) fn install_index(&self, mut idx: VolumeIndex) {
123        let mut guard = self.index.write();
124        if let Some(prev) = guard.as_ref() {
125            idx.bump_structural_from(prev.structural_generation());
126        }
127        // Generation checks already reject it, but holding onto a dead
128        // index's id list (4B × entries) serves nobody.
129        *self.last_query.lock() = None;
130        *guard = Some(idx);
131    }
132}
133
134impl Engine {
135    /// Fixed NTFS volumes ("C:", "D:", …).
136    #[cfg(windows)]
137    #[must_use]
138    pub fn list_ntfs_volumes() -> Vec<String> {
139        use windows_sys::Win32::Storage::FileSystem::{
140            GetDriveTypeW, GetLogicalDrives, GetVolumeInformationW,
141        };
142        const DRIVE_FIXED: u32 = 3;
143        let mut out = Vec::new();
144        let mask = unsafe { GetLogicalDrives() };
145        for i in 0..26u32 {
146            if mask & (1 << i) == 0 {
147                continue;
148            }
149            let letter = (b'A' + i as u8) as char;
150            let root: Vec<u16> = format!("{letter}:\\").encode_utf16().chain([0]).collect();
151            unsafe {
152                if GetDriveTypeW(root.as_ptr()) != DRIVE_FIXED {
153                    continue;
154                }
155                let mut fs = [0u16; 32];
156                let ok = GetVolumeInformationW(
157                    root.as_ptr(),
158                    std::ptr::null_mut(),
159                    0,
160                    std::ptr::null_mut(),
161                    std::ptr::null_mut(),
162                    std::ptr::null_mut(),
163                    fs.as_mut_ptr(),
164                    fs.len() as u32,
165                );
166                if ok != 0 {
167                    let fs_name: String = String::from_utf16_lossy(
168                        &fs[..fs.iter().position(|&c| c == 0).unwrap_or(0)],
169                    );
170                    if fs_name == "NTFS" {
171                        out.push(format!("{letter}:"));
172                    }
173                }
174            }
175        }
176        out
177    }
178
179    /// Writes the slot's snapshot (via its `SnapshotStore`) under the
180    /// per-slot save lock and records the saved generations (the flush
181    /// dirty check). Returns false on a failed write — already counted and
182    /// logged here.
183    #[cfg(windows)]
184    pub(super) fn save_slot(&self, slot: &VolumeSlot, checkpoint: JournalCheckpoint) -> bool {
185        let _writer = slot.save_lock.lock();
186        let guard = slot.index.read();
187        let Some(idx) = guard.as_ref() else {
188            return false;
189        };
190        let generations = (idx.content_generation(), idx.structural_generation());
191        if let Err(e) = slot
192            .store
193            .save_atomic(idx, checkpoint.journal_id, checkpoint.next_usn)
194        {
195            Counters::bump(&self.metrics.counters.snapshot_save_failures);
196            tracing::warn!(volume = %slot.label, error = %e, "snapshot save failed");
197            return false;
198        }
199        *slot.last_saved.lock() = Some(generations);
200        true
201    }
202}
203
204/// `{index_dir}\{drive-letter}.fmfidx` — the path each volume's
205/// `WinSnapshotStore` is built on.
206pub(super) fn snapshot_path(index_dir: &std::path::Path, label: &str) -> std::path::PathBuf {
207    index_dir.join(format!(
208        "{}.fmfidx",
209        label.trim_end_matches(':').to_ascii_lowercase()
210    ))
211}
212
213/// A drive label is exactly one ASCII letter followed by `':'` ("C:", "d:") —
214/// the shape `list_ntfs_volumes` produces and `snapshot_path` expects. This is
215/// the trust boundary for [`Engine::index_start`](super::Engine::index_start):
216/// validating here bounds the set of distinct labels to a small finite set (so
217/// a hostile caller can't spawn unbounded volume threads) and stops a label
218/// bearing `..\` or path separators from steering `snapshot_path` outside the
219/// index directory.
220pub(super) fn is_valid_volume_label(label: &str) -> bool {
221    let b = label.as_bytes();
222    b.len() == 2 && b[0].is_ascii_alphabetic() && b[1] == b':'
223}