Skip to main content

fmf_core\scan/
mod.rs

1//! Streaming $MFT scanner (ADR-0011).
2//!
3//! The $MFT's data runs are read in 16MiB aligned chunks through our own
4//! volume handle, records are fixed up and parsed per chunk, and the
5//! buffers are recycled — peak RAM is bounded at a few chunks. ntfs-reader
6//! provides the bootstrap (boot-sector geometry + record 0's data runs) and
7//! the per-record attribute parsing types.
8//!
9//! Two layers of overlap (entry order stays byte-for-byte identical to a
10//! sequential scan):
11//! - a dedicated I/O thread reads chunk N+1 while chunk N parses
12//!   (`pipeline::run_chunk_pipeline`; degrades to inline reads if the
13//!   thread can't start — `scan_pipeline_fallbacks`)
14//! - within a chunk, record sub-ranges parse on rayon workers that carry
15//!   the WTF-8 encoding too (`parse::parse_chunk`); the builder then
16//!   appends the worker batches in chunk order, so `EntryId` assignment is
17//!   deterministic.
18
19mod deferred;
20mod parse;
21mod pipeline;
22mod probe;
23mod volume_io;
24pub mod walk;
25mod walk_id;
26
27pub use probe::{IoProbeMode, ProbeStats, io_probe};
28
29use std::time::{Duration, Instant};
30
31use ntfs_reader::api::ROOT_RECORD;
32use ntfs_reader::errors::NtfsReaderError;
33use rustc_hash::FxHashMap;
34
35use crate::index::{VolumeIndex, VolumeIndexBuilder};
36use crate::mft::{MftError, peak_working_set};
37
38use deferred::resolve_deferred;
39use parse::{RecordArena, append_batches, parse_chunk};
40use pipeline::{plan_chunks, run_chunk_pipeline};
41use volume_io::mft_layout;
42
43/// Statistics from a full index build.
44#[derive(Debug, Default)]
45pub struct ScanStats {
46    /// Drive letter spec that was scanned (e.g. `C:`).
47    pub volume: String,
48    /// Wall-clock time for the whole scan + build (ms).
49    pub elapsed_total_ms: u64,
50    /// Accumulated device-read time. Overlaps with parsing on the pipelined
51    /// path, so read + parse + build + sort may exceed total.
52    pub elapsed_mft_load_ms: u64,
53    /// Accumulated record-parse time (fixup + attribute walk + WTF-8).
54    pub elapsed_parse_ms: u64,
55    /// Deferred $`ATTRIBUTE_LIST` name resolution.
56    pub elapsed_deferred_ms: u64,
57    /// Records whose name needed the deferred pass at all.
58    pub deferred_names: u64,
59    /// Builder finish: parent resolution + EXCLUDED propagation.
60    pub elapsed_build_ms: u64,
61    /// Builder finish: the three permutation sorts.
62    pub elapsed_sort_ms: u64,
63    /// 1 when the read-ahead I/O thread could not start and the scan
64    /// degraded to inline sequential reads.
65    pub pipeline_fallbacks: u64,
66    /// Files indexed (count).
67    pub files: u64,
68    /// Directories indexed (count).
69    pub dirs: u64,
70    /// Records dropped because no usable name could be resolved (count).
71    pub skipped_no_name: u64,
72    /// Peak working-set RAM of the scanning process (bytes).
73    pub peak_working_set_bytes: u64,
74    /// Raw $MFT size — the bytes the initial scan reads.
75    pub mft_bytes: u64,
76    /// Extension records (`base_reference` != 0) — parts of other files,
77    /// correctly not indexed standalone.
78    pub extension_records: u64,
79    /// Records failing signature/fixup validation.
80    pub corrupt_records: u64,
81    /// Deferred $`ATTRIBUTE_LIST` records whose name never resolved.
82    pub deferred_unresolved: u64,
83    /// Name-bearing extension records past the in-RAM cache cap (those
84    /// targets fall back to disk reads in the deferred pass).
85    pub ext_name_cache_skipped: u64,
86    /// Deferred-pass targeted disk reads that failed — each one is a name
87    /// that stays unresolved until the next rescan.
88    pub deferred_name_read_failures: u64,
89    /// Scope-mode (folder-walk, ADR-0024) only: directories enumerated.
90    pub walk_dirs: u64,
91    /// Scope-mode only: files enumerated.
92    pub walk_files: u64,
93    /// Scope-mode only: wall-clock of the enumeration phase (ms).
94    pub elapsed_walk_ms: u64,
95    /// Scope-mode only: roots/dirs/entries skipped because they could not be
96    /// read (permission, vanished). The worker maps this to a counter + warn.
97    pub walk_read_errors: u64,
98    /// Scope-mode only: subtrees not descended because they hit `MAX_DEPTH`.
99    pub walk_depth_truncated: u64,
100    /// Scope-mode only: directories (and their subtrees) skipped because they
101    /// matched a user exclude (ADR-0025). Normal behaviour, not a degradation —
102    /// surfaced in the scan-complete log, never a degrade counter.
103    pub walk_excluded_pruned: u64,
104}
105
106/// Full initial scan: stream the volume's $MFT and build the in-memory
107/// index. `drive` is a drive letter spec like `C:`.
108///
109/// # Errors
110///
111/// Returns [`MftError::NotElevated`] when the process lacks the privileges to
112/// open the raw volume, or [`MftError::Ntfs`] if opening the volume or
113/// reading the $MFT fails.
114pub fn scan_volume(drive: &str) -> Result<(VolumeIndex, ScanStats), MftError> {
115    let drive = drive.trim_end_matches(['\\', '/']);
116    let volume_path = format!(r"\\.\{drive}");
117    let mut stats = ScanStats {
118        volume: drive.to_string(),
119        ..Default::default()
120    };
121
122    let t0 = Instant::now();
123    let (record_size, data_size, runmap) = mft_layout(&volume_path).map_err(|e| match e {
124        NtfsReaderError::ElevationError => MftError::NotElevated,
125        other => MftError::Ntfs(other),
126    })?;
127    stats.mft_bytes = data_size;
128
129    let chunks = plan_chunks(&runmap, data_size, record_size);
130    let mut b = VolumeIndexBuilder::new(drive, ROOT_RECORD);
131    let mut deferred: Vec<(u64, u32)> = Vec::new();
132    let mut extensions: FxHashMap<u64, u32> = FxHashMap::default();
133    let mut arena = RecordArena::new(record_size);
134    let mut parse_time = Duration::ZERO;
135
136    let (read_time, fallbacks) = run_chunk_pipeline(&volume_path, &chunks, &mut |i, bytes| {
137        let t = Instant::now();
138        let batches = parse_chunk(bytes, chunks[i].logical, record_size);
139        append_batches(
140            &mut b,
141            &mut stats,
142            &mut deferred,
143            &mut extensions,
144            &mut arena,
145            batches,
146        );
147        parse_time += t.elapsed();
148    })
149    .map_err(MftError::Ntfs)?;
150    stats.elapsed_mft_load_ms = read_time.as_millis() as u64;
151    stats.elapsed_parse_ms = parse_time.as_millis() as u64;
152    stats.pipeline_fallbacks = fallbacks;
153
154    // Deferred pass: names hiding behind $ATTRIBUTE_LIST, resolved in
155    // parallel from the streamed extension-record cache (ADR-0011).
156    let t_deferred = Instant::now();
157    stats.deferred_names = deferred.len() as u64;
158    let batches = resolve_deferred(
159        &volume_path,
160        &runmap,
161        record_size,
162        &extensions,
163        &arena,
164        &deferred,
165    );
166    append_batches(
167        &mut b,
168        &mut stats,
169        &mut Vec::new(),
170        &mut FxHashMap::default(),
171        &mut RecordArena::new(record_size),
172        batches,
173    );
174    stats.elapsed_deferred_ms = t_deferred.elapsed().as_millis() as u64;
175    drop(extensions);
176    drop(deferred);
177    drop(arena);
178    // Cache overflow (`ext_name_cache_skipped`) and failed deferred reads
179    // (`deferred_name_read_failures`) are returned in ScanStats only; the
180    // volume worker maps them into counters + warn at its single mapping
181    // point (engine/worker.rs).
182
183    // Degradations are normal in small numbers; make them visible either way.
184    if stats.corrupt_records > 0 {
185        tracing::warn!(volume = %drive, count = stats.corrupt_records, "corrupt MFT records skipped");
186    }
187    if stats.deferred_unresolved > 0 {
188        tracing::warn!(
189            volume = %drive,
190            count = stats.deferred_unresolved,
191            "attribute-list names unresolved"
192        );
193    }
194
195    let (idx, finish) = b.finish_timed();
196    stats.elapsed_build_ms = finish.build_ms;
197    stats.elapsed_sort_ms = finish.sort_ms;
198    stats.elapsed_total_ms = t0.elapsed().as_millis() as u64;
199    stats.peak_working_set_bytes = peak_working_set();
200    Ok((idx, stats))
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206
207    /// Equivalence gate against the whole-load reference path. Run from an
208    /// elevated shell: `FMF_ADMIN_TESTS=1` cargo test -- --ignored streaming
209    /// The volume is live, so a small drift tolerance is allowed.
210    #[test]
211    #[ignore = "requires elevation; gated by FMF_ADMIN_TESTS"]
212    fn streaming_scan_matches_reference() {
213        if std::env::var("FMF_ADMIN_TESTS").as_deref() != Ok("1") {
214            eprintln!("FMF_ADMIN_TESTS != 1 — skipping");
215            return;
216        }
217        let (new_idx, new_stats) = scan_volume("C:").expect("streaming scan");
218        let (old_idx, old_stats) = crate::mft::scan_volume_reference("C:").expect("reference");
219
220        let drift = (new_idx.len() as i64 - old_idx.len() as i64).unsigned_abs();
221        assert!(
222            drift < old_idx.len() as u64 / 500,
223            "entry counts diverged: streaming {} vs reference {} (files {}/{} dirs {}/{})",
224            new_idx.len(),
225            old_idx.len(),
226            new_stats.files,
227            old_stats.files,
228            new_stats.dirs,
229            old_stats.dirs,
230        );
231
232        // Sampled records must agree on name and size where both saw them.
233        // Reparse points are excluded: pick_name keeps their names on
234        // purpose while the reference's get_best_file_name skips them, so
235        // the two resolvers legitimately disagree there (and on this class
236        // only — see the module docs of `pick_name`).
237        let mut checked = 0u64;
238        let mut matched = 0u64;
239        let mut size_matched = 0u64;
240        let mut mismatches: Vec<String> = Vec::new();
241        for sample in (0..old_idx.len() as u32).step_by(997) {
242            let old_rec = old_idx.frn(sample).record();
243            let (Some(o), Some(n)) = (
244                old_idx.entry_by_record(old_rec),
245                new_idx.entry_by_record(old_rec),
246            ) else {
247                continue;
248            };
249            if old_idx.is_reparse(o) || new_idx.is_reparse(n) {
250                continue;
251            }
252            checked += 1;
253            if old_idx.name(o) == new_idx.name(n) {
254                matched += 1;
255            } else {
256                use std::os::windows::ffi::OsStringExt;
257
258                // The resolvers legitimately disagree on attribute-list
259                // names: get_best_file_name returns the *first* $FILE_NAME
260                // of a target record (often the DOS 8.3 short name) and the
261                // first Win32 link of hardlinked files, while pick_name
262                // scans for the best Win32 name. Arbitrate with the disk:
263                // if the streaming-derived full path exists, the streaming
264                // name is right.
265                let mut p = Vec::new();
266                new_idx.append_path(n, &mut p);
267                let mut units = Vec::new();
268                crate::wtf8::wtf8_to_utf16(&p, &mut units);
269                let path = std::path::PathBuf::from(std::ffi::OsString::from_wide(&units));
270                if std::fs::symlink_metadata(&path).is_ok() {
271                    matched += 1;
272                } else if mismatches.len() < 16 {
273                    mismatches.push(format!(
274                        "record {}: reference `{}` vs streaming `{}` (path gone: {})",
275                        old_rec.0,
276                        String::from_utf8_lossy(old_idx.name(o)),
277                        String::from_utf8_lossy(new_idx.name(n)),
278                        path.display(),
279                    ));
280                }
281            }
282            if old_idx.size(o) == new_idx.size(n) {
283                size_matched += 1;
284            }
285        }
286        assert!(checked > 100, "sample too small: {checked}");
287        assert!(
288            matched as f64 / checked as f64 > 0.999,
289            "sampled name mismatch: {matched}/{checked}\n{}",
290            mismatches.join("\n")
291        );
292        // Sizes drift legitimately: the volume is live and the two scans run
293        // a minute apart, so actively-written files differ. Names only move
294        // on renames — hence the looser size bar.
295        assert!(
296            size_matched as f64 / checked as f64 > 0.99,
297            "sampled size mismatch: {size_matched}/{checked}"
298        );
299    }
300}