fmf_core\engine/volume.rs
1//! Per-volume state: the slot the engine and the worker thread share
2//! (`VolumeSlot`), the index install rule, the journal checkpoint, and the
3//! snapshot save helper. The thread that drives the flow lives in
4//! `worker.rs`; the OS-effect seams it runs against live in `seams.rs`.
5
6use std::sync::Arc;
7use std::sync::atomic::AtomicBool;
8
9use parking_lot::{Mutex, RwLock};
10
11use crate::index::{EntryId, VolumeIndex};
12use crate::metrics::Counters;
13use crate::query::{CompiledQuery, QueryOptions};
14
15use super::seams::SnapshotStore;
16use super::{Engine, VolumeState};
17
18/// Last materialized per-volume result, kept for incremental refinement
19/// (query/subsume.rs) and unchanged-result detection (`QueryTrace::
20/// unchanged`). Validity = both generations still match; USN batches
21/// invalidate implicitly by bumping `content_generation`.
22pub(super) struct VolumeQueryCache {
23 /// The raw query text — equality here (with `opt`) defines "the same
24 /// query" for unchanged detection; subsumption defines refinement.
25 pub(super) text: String,
26 pub(super) compiled: Arc<CompiledQuery>,
27 pub(super) opt: QueryOptions,
28 pub(super) content_generation: u64,
29 pub(super) structural_generation: u64,
30 pub(super) ids: Arc<[EntryId]>,
31}
32
33/// USN position paired with the index state, shared with `Engine::flush`.
34/// The tailing thread owns the journal handle; "save now" from another
35/// thread needs (`journal_id`, `next_usn`) without touching it. Updated *after*
36/// a batch is applied, so a concurrent flush that reads the checkpoint
37/// first always saves checkpoint ≤ index — the USN replay on load covers
38/// the gap (re-applying records is idempotent; skipping them would not be).
39#[derive(Clone, Copy)]
40pub(super) struct JournalCheckpoint {
41 pub(super) journal_id: u64,
42 pub(super) next_usn: i64,
43}
44
45/// What drives a volume slot: the privileged $MFT scan + USN journal, or the
46/// non-elevated folder-walk + `ReadDirectoryChangesW` watcher (scope mode,
47/// ADR-0024). The worker reuses one loop for both; this picks the initial-scan
48/// source and the change source at the two branch points.
49pub(super) enum WorkerKind {
50 /// `mft::scan_volume(label)` + `WinJournalSource` — needs elevation.
51 Mft,
52 /// `scan::walk::walk_scan(roots, excludes)` + `WatcherJournalSource` — no
53 /// elevation. `excludes` prunes matching subtrees at walk time (ADR-0025).
54 Walk {
55 roots: Vec<String>,
56 excludes: Vec<String>,
57 },
58}
59
60pub(super) struct VolumeSlot {
61 pub(super) label: String,
62 pub(super) phase: Mutex<VolumeState>,
63 pub(super) scanned: Mutex<u64>,
64 pub(super) index: RwLock<Option<VolumeIndex>>,
65 pub(super) stop: Arc<AtomicBool>,
66 /// Single-entry query cache (lock order: `index` read first, then this).
67 pub(super) last_query: Mutex<Option<VolumeQueryCache>>,
68 /// None until the volume is Ready (flush skips it).
69 pub(super) checkpoint: Mutex<Option<JournalCheckpoint>>,
70 /// (content, structural) generations at the last snapshot save — the
71 /// dirty check that keeps periodic flushes from rewriting unchanged
72 /// volumes.
73 pub(super) last_saved: Mutex<Option<(u64, u64)>>,
74 /// Serializes snapshot writers for this slot (flush vs. stop-save).
75 pub(super) save_lock: Mutex<()>,
76 /// Snapshot persistence seam for this volume (ADR-0018) — production
77 /// is `WinSnapshotStore` on `snapshot_path(...)`.
78 pub(super) store: Arc<dyn SnapshotStore>,
79 /// Initial-scan + change source this slot drives (ADR-0024).
80 pub(super) kind: WorkerKind,
81}
82
83impl VolumeSlot {
84 /// A privileged ($MFT + USN) slot in its initial Scanning state — the
85 /// shape `index_start` (and the worker tests) spawn workers on.
86 pub(super) fn scanning(label: String, store: Arc<dyn SnapshotStore>) -> Self {
87 Self::scanning_kind(label, store, WorkerKind::Mft)
88 }
89
90 /// A non-elevated scope-mode (folder-walk + watcher) slot (ADR-0024).
91 pub(super) fn scanning_walk(
92 label: String,
93 store: Arc<dyn SnapshotStore>,
94 roots: Vec<String>,
95 excludes: Vec<String>,
96 ) -> Self {
97 Self::scanning_kind(label, store, WorkerKind::Walk { roots, excludes })
98 }
99
100 fn scanning_kind(label: String, store: Arc<dyn SnapshotStore>, kind: WorkerKind) -> Self {
101 Self {
102 label,
103 phase: Mutex::new(VolumeState::Scanning),
104 scanned: Mutex::new(0),
105 index: RwLock::new(None),
106 stop: Arc::new(AtomicBool::new(false)),
107 last_query: Mutex::new(None),
108 checkpoint: Mutex::new(None),
109 last_saved: Mutex::new(None),
110 save_lock: Mutex::new(()),
111 store,
112 kind,
113 }
114 }
115
116 /// Install a freshly built index. Replacing an existing one is a
117 /// structural change (journal-gone full rescan): the new index inherits
118 /// the previous `structural_generation + 1` so open `ResultSet`s go
119 /// hard-stale (docs/ARCHITECTURE.md, 2-layer generation). A first install
120 /// (initial scan or snapshot restore) keeps the value the index was
121 /// built with.
122 pub(super) fn install_index(&self, mut idx: VolumeIndex) {
123 let mut guard = self.index.write();
124 if let Some(prev) = guard.as_ref() {
125 idx.bump_structural_from(prev.structural_generation());
126 }
127 // Generation checks already reject it, but holding onto a dead
128 // index's id list (4B × entries) serves nobody.
129 *self.last_query.lock() = None;
130 *guard = Some(idx);
131 }
132}
133
134impl Engine {
135 /// Fixed NTFS volumes ("C:", "D:", …).
136 #[cfg(windows)]
137 #[must_use]
138 pub fn list_ntfs_volumes() -> Vec<String> {
139 use windows_sys::Win32::Storage::FileSystem::{
140 GetDriveTypeW, GetLogicalDrives, GetVolumeInformationW,
141 };
142 const DRIVE_FIXED: u32 = 3;
143 let mut out = Vec::new();
144 let mask = unsafe { GetLogicalDrives() };
145 for i in 0..26u32 {
146 if mask & (1 << i) == 0 {
147 continue;
148 }
149 let letter = (b'A' + i as u8) as char;
150 let root: Vec<u16> = format!("{letter}:\\").encode_utf16().chain([0]).collect();
151 unsafe {
152 if GetDriveTypeW(root.as_ptr()) != DRIVE_FIXED {
153 continue;
154 }
155 let mut fs = [0u16; 32];
156 let ok = GetVolumeInformationW(
157 root.as_ptr(),
158 std::ptr::null_mut(),
159 0,
160 std::ptr::null_mut(),
161 std::ptr::null_mut(),
162 std::ptr::null_mut(),
163 fs.as_mut_ptr(),
164 fs.len() as u32,
165 );
166 if ok != 0 {
167 let fs_name: String = String::from_utf16_lossy(
168 &fs[..fs.iter().position(|&c| c == 0).unwrap_or(0)],
169 );
170 if fs_name == "NTFS" {
171 out.push(format!("{letter}:"));
172 }
173 }
174 }
175 }
176 out
177 }
178
179 /// Writes the slot's snapshot (via its `SnapshotStore`) under the
180 /// per-slot save lock and records the saved generations (the flush
181 /// dirty check). Returns false on a failed write — already counted and
182 /// logged here.
183 #[cfg(windows)]
184 pub(super) fn save_slot(&self, slot: &VolumeSlot, checkpoint: JournalCheckpoint) -> bool {
185 let _writer = slot.save_lock.lock();
186 let guard = slot.index.read();
187 let Some(idx) = guard.as_ref() else {
188 return false;
189 };
190 let generations = (idx.content_generation(), idx.structural_generation());
191 if let Err(e) = slot
192 .store
193 .save_atomic(idx, checkpoint.journal_id, checkpoint.next_usn)
194 {
195 Counters::bump(&self.metrics.counters.snapshot_save_failures);
196 tracing::warn!(volume = %slot.label, error = %e, "snapshot save failed");
197 return false;
198 }
199 *slot.last_saved.lock() = Some(generations);
200 true
201 }
202}
203
204/// `{index_dir}\{drive-letter}.fmfidx` — the path each volume's
205/// `WinSnapshotStore` is built on.
206pub(super) fn snapshot_path(index_dir: &std::path::Path, label: &str) -> std::path::PathBuf {
207 index_dir.join(format!(
208 "{}.fmfidx",
209 label.trim_end_matches(':').to_ascii_lowercase()
210 ))
211}
212
213/// A drive label is exactly one ASCII letter followed by `':'` ("C:", "d:") —
214/// the shape `list_ntfs_volumes` produces and `snapshot_path` expects. This is
215/// the trust boundary for [`Engine::index_start`](super::Engine::index_start):
216/// validating here bounds the set of distinct labels to a small finite set (so
217/// a hostile caller can't spawn unbounded volume threads) and stops a label
218/// bearing `..\` or path separators from steering `snapshot_path` outside the
219/// index directory.
220pub(super) fn is_valid_volume_label(label: &str) -> bool {
221 let b = label.as_bytes();
222 b.len() == 2 && b[0].is_ascii_alphabetic() && b[1] == b':'
223}