fmf_core\scan/mod.rs
1//! Streaming $MFT scanner (ADR-0011).
2//!
3//! The $MFT's data runs are read in 16MiB aligned chunks through our own
4//! volume handle, records are fixed up and parsed per chunk, and the
5//! buffers are recycled — peak RAM is bounded at a few chunks. ntfs-reader
6//! provides the bootstrap (boot-sector geometry + record 0's data runs) and
7//! the per-record attribute parsing types.
8//!
9//! Two layers of overlap (entry order stays byte-for-byte identical to a
10//! sequential scan):
11//! - a dedicated I/O thread reads chunk N+1 while chunk N parses
12//! (`pipeline::run_chunk_pipeline`; degrades to inline reads if the
13//! thread can't start — `scan_pipeline_fallbacks`)
14//! - within a chunk, record sub-ranges parse on rayon workers that carry
15//! the WTF-8 encoding too (`parse::parse_chunk`); the builder then
16//! appends the worker batches in chunk order, so `EntryId` assignment is
17//! deterministic.
18
19mod deferred;
20mod parse;
21mod pipeline;
22mod probe;
23mod volume_io;
24pub mod walk;
25mod walk_id;
26
27pub use probe::{IoProbeMode, ProbeStats, io_probe};
28
29use std::time::{Duration, Instant};
30
31use ntfs_reader::api::ROOT_RECORD;
32use ntfs_reader::errors::NtfsReaderError;
33use rustc_hash::FxHashMap;
34
35use crate::index::{VolumeIndex, VolumeIndexBuilder};
36use crate::mft::{MftError, peak_working_set};
37
38use deferred::resolve_deferred;
39use parse::{RecordArena, append_batches, parse_chunk};
40use pipeline::{plan_chunks, run_chunk_pipeline};
41use volume_io::mft_layout;
42
43/// Statistics from a full index build.
44#[derive(Debug, Default)]
45pub struct ScanStats {
46 /// Drive letter spec that was scanned (e.g. `C:`).
47 pub volume: String,
48 /// Wall-clock time for the whole scan + build (ms).
49 pub elapsed_total_ms: u64,
50 /// Accumulated device-read time. Overlaps with parsing on the pipelined
51 /// path, so read + parse + build + sort may exceed total.
52 pub elapsed_mft_load_ms: u64,
53 /// Accumulated record-parse time (fixup + attribute walk + WTF-8).
54 pub elapsed_parse_ms: u64,
55 /// Deferred $`ATTRIBUTE_LIST` name resolution.
56 pub elapsed_deferred_ms: u64,
57 /// Records whose name needed the deferred pass at all.
58 pub deferred_names: u64,
59 /// Builder finish: parent resolution + EXCLUDED propagation.
60 pub elapsed_build_ms: u64,
61 /// Builder finish: the three permutation sorts.
62 pub elapsed_sort_ms: u64,
63 /// 1 when the read-ahead I/O thread could not start and the scan
64 /// degraded to inline sequential reads.
65 pub pipeline_fallbacks: u64,
66 /// Files indexed (count).
67 pub files: u64,
68 /// Directories indexed (count).
69 pub dirs: u64,
70 /// Records dropped because no usable name could be resolved (count).
71 pub skipped_no_name: u64,
72 /// Peak working-set RAM of the scanning process (bytes).
73 pub peak_working_set_bytes: u64,
74 /// Raw $MFT size — the bytes the initial scan reads.
75 pub mft_bytes: u64,
76 /// Extension records (`base_reference` != 0) — parts of other files,
77 /// correctly not indexed standalone.
78 pub extension_records: u64,
79 /// Records failing signature/fixup validation.
80 pub corrupt_records: u64,
81 /// Deferred $`ATTRIBUTE_LIST` records whose name never resolved.
82 pub deferred_unresolved: u64,
83 /// Name-bearing extension records past the in-RAM cache cap (those
84 /// targets fall back to disk reads in the deferred pass).
85 pub ext_name_cache_skipped: u64,
86 /// Deferred-pass targeted disk reads that failed — each one is a name
87 /// that stays unresolved until the next rescan.
88 pub deferred_name_read_failures: u64,
89 /// Scope-mode (folder-walk, ADR-0024) only: directories enumerated.
90 pub walk_dirs: u64,
91 /// Scope-mode only: files enumerated.
92 pub walk_files: u64,
93 /// Scope-mode only: wall-clock of the enumeration phase (ms).
94 pub elapsed_walk_ms: u64,
95 /// Scope-mode only: roots/dirs/entries skipped because they could not be
96 /// read (permission, vanished). The worker maps this to a counter + warn.
97 pub walk_read_errors: u64,
98 /// Scope-mode only: subtrees not descended because they hit `MAX_DEPTH`.
99 pub walk_depth_truncated: u64,
100 /// Scope-mode only: directories (and their subtrees) skipped because they
101 /// matched a user exclude (ADR-0025). Normal behaviour, not a degradation —
102 /// surfaced in the scan-complete log, never a degrade counter.
103 pub walk_excluded_pruned: u64,
104}
105
106/// Full initial scan: stream the volume's $MFT and build the in-memory
107/// index. `drive` is a drive letter spec like `C:`.
108///
109/// # Errors
110///
111/// Returns [`MftError::NotElevated`] when the process lacks the privileges to
112/// open the raw volume, or [`MftError::Ntfs`] if opening the volume or
113/// reading the $MFT fails.
114pub fn scan_volume(drive: &str) -> Result<(VolumeIndex, ScanStats), MftError> {
115 let drive = drive.trim_end_matches(['\\', '/']);
116 let volume_path = format!(r"\\.\{drive}");
117 let mut stats = ScanStats {
118 volume: drive.to_string(),
119 ..Default::default()
120 };
121
122 let t0 = Instant::now();
123 let (record_size, data_size, runmap) = mft_layout(&volume_path).map_err(|e| match e {
124 NtfsReaderError::ElevationError => MftError::NotElevated,
125 other => MftError::Ntfs(other),
126 })?;
127 stats.mft_bytes = data_size;
128
129 let chunks = plan_chunks(&runmap, data_size, record_size);
130 let mut b = VolumeIndexBuilder::new(drive, ROOT_RECORD);
131 let mut deferred: Vec<(u64, u32)> = Vec::new();
132 let mut extensions: FxHashMap<u64, u32> = FxHashMap::default();
133 let mut arena = RecordArena::new(record_size);
134 let mut parse_time = Duration::ZERO;
135
136 let (read_time, fallbacks) = run_chunk_pipeline(&volume_path, &chunks, &mut |i, bytes| {
137 let t = Instant::now();
138 let batches = parse_chunk(bytes, chunks[i].logical, record_size);
139 append_batches(
140 &mut b,
141 &mut stats,
142 &mut deferred,
143 &mut extensions,
144 &mut arena,
145 batches,
146 );
147 parse_time += t.elapsed();
148 })
149 .map_err(MftError::Ntfs)?;
150 stats.elapsed_mft_load_ms = read_time.as_millis() as u64;
151 stats.elapsed_parse_ms = parse_time.as_millis() as u64;
152 stats.pipeline_fallbacks = fallbacks;
153
154 // Deferred pass: names hiding behind $ATTRIBUTE_LIST, resolved in
155 // parallel from the streamed extension-record cache (ADR-0011).
156 let t_deferred = Instant::now();
157 stats.deferred_names = deferred.len() as u64;
158 let batches = resolve_deferred(
159 &volume_path,
160 &runmap,
161 record_size,
162 &extensions,
163 &arena,
164 &deferred,
165 );
166 append_batches(
167 &mut b,
168 &mut stats,
169 &mut Vec::new(),
170 &mut FxHashMap::default(),
171 &mut RecordArena::new(record_size),
172 batches,
173 );
174 stats.elapsed_deferred_ms = t_deferred.elapsed().as_millis() as u64;
175 drop(extensions);
176 drop(deferred);
177 drop(arena);
178 // Cache overflow (`ext_name_cache_skipped`) and failed deferred reads
179 // (`deferred_name_read_failures`) are returned in ScanStats only; the
180 // volume worker maps them into counters + warn at its single mapping
181 // point (engine/worker.rs).
182
183 // Degradations are normal in small numbers; make them visible either way.
184 if stats.corrupt_records > 0 {
185 tracing::warn!(volume = %drive, count = stats.corrupt_records, "corrupt MFT records skipped");
186 }
187 if stats.deferred_unresolved > 0 {
188 tracing::warn!(
189 volume = %drive,
190 count = stats.deferred_unresolved,
191 "attribute-list names unresolved"
192 );
193 }
194
195 let (idx, finish) = b.finish_timed();
196 stats.elapsed_build_ms = finish.build_ms;
197 stats.elapsed_sort_ms = finish.sort_ms;
198 stats.elapsed_total_ms = t0.elapsed().as_millis() as u64;
199 stats.peak_working_set_bytes = peak_working_set();
200 Ok((idx, stats))
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206
207 /// Equivalence gate against the whole-load reference path. Run from an
208 /// elevated shell: `FMF_ADMIN_TESTS=1` cargo test -- --ignored streaming
209 /// The volume is live, so a small drift tolerance is allowed.
210 #[test]
211 #[ignore = "requires elevation; gated by FMF_ADMIN_TESTS"]
212 fn streaming_scan_matches_reference() {
213 if std::env::var("FMF_ADMIN_TESTS").as_deref() != Ok("1") {
214 eprintln!("FMF_ADMIN_TESTS != 1 — skipping");
215 return;
216 }
217 let (new_idx, new_stats) = scan_volume("C:").expect("streaming scan");
218 let (old_idx, old_stats) = crate::mft::scan_volume_reference("C:").expect("reference");
219
220 let drift = (new_idx.len() as i64 - old_idx.len() as i64).unsigned_abs();
221 assert!(
222 drift < old_idx.len() as u64 / 500,
223 "entry counts diverged: streaming {} vs reference {} (files {}/{} dirs {}/{})",
224 new_idx.len(),
225 old_idx.len(),
226 new_stats.files,
227 old_stats.files,
228 new_stats.dirs,
229 old_stats.dirs,
230 );
231
232 // Sampled records must agree on name and size where both saw them.
233 // Reparse points are excluded: pick_name keeps their names on
234 // purpose while the reference's get_best_file_name skips them, so
235 // the two resolvers legitimately disagree there (and on this class
236 // only — see the module docs of `pick_name`).
237 let mut checked = 0u64;
238 let mut matched = 0u64;
239 let mut size_matched = 0u64;
240 let mut mismatches: Vec<String> = Vec::new();
241 for sample in (0..old_idx.len() as u32).step_by(997) {
242 let old_rec = old_idx.frn(sample).record();
243 let (Some(o), Some(n)) = (
244 old_idx.entry_by_record(old_rec),
245 new_idx.entry_by_record(old_rec),
246 ) else {
247 continue;
248 };
249 if old_idx.is_reparse(o) || new_idx.is_reparse(n) {
250 continue;
251 }
252 checked += 1;
253 if old_idx.name(o) == new_idx.name(n) {
254 matched += 1;
255 } else {
256 use std::os::windows::ffi::OsStringExt;
257
258 // The resolvers legitimately disagree on attribute-list
259 // names: get_best_file_name returns the *first* $FILE_NAME
260 // of a target record (often the DOS 8.3 short name) and the
261 // first Win32 link of hardlinked files, while pick_name
262 // scans for the best Win32 name. Arbitrate with the disk:
263 // if the streaming-derived full path exists, the streaming
264 // name is right.
265 let mut p = Vec::new();
266 new_idx.append_path(n, &mut p);
267 let mut units = Vec::new();
268 crate::wtf8::wtf8_to_utf16(&p, &mut units);
269 let path = std::path::PathBuf::from(std::ffi::OsString::from_wide(&units));
270 if std::fs::symlink_metadata(&path).is_ok() {
271 matched += 1;
272 } else if mismatches.len() < 16 {
273 mismatches.push(format!(
274 "record {}: reference `{}` vs streaming `{}` (path gone: {})",
275 old_rec.0,
276 String::from_utf8_lossy(old_idx.name(o)),
277 String::from_utf8_lossy(new_idx.name(n)),
278 path.display(),
279 ));
280 }
281 }
282 if old_idx.size(o) == new_idx.size(n) {
283 size_matched += 1;
284 }
285 }
286 assert!(checked > 100, "sample too small: {checked}");
287 assert!(
288 matched as f64 / checked as f64 > 0.999,
289 "sampled name mismatch: {matched}/{checked}\n{}",
290 mismatches.join("\n")
291 );
292 // Sizes drift legitimately: the volume is live and the two scans run
293 // a minute apart, so actively-written files differ. Names only move
294 // on renames — hence the looser size bar.
295 assert!(
296 size_matched as f64 / checked as f64 > 0.99,
297 "sampled size mismatch: {size_matched}/{checked}"
298 );
299 }
300}