Skip to main content

fmf_core\scan/
parse.rs

1//! Parallel chunk parsing (ADR-0011): record sub-ranges of one chunk fan
2//! out across rayon workers, each producing a [`ParsedBatch`]; the builder
3//! appends the batches in chunk order, so `EntryId` assignment is
4//! deterministic.
5
6use ntfs_reader::api::{FIRST_NORMAL_RECORD, NtfsAttributeType, NtfsFileName};
7use ntfs_reader::file::NtfsFile;
8use rustc_hash::FxHashMap;
9
10use crate::index::{EncodedEntry, Frn, VolumeIndexBuilder};
11use crate::mft::pick_name;
12use crate::wtf8;
13
14use super::ScanStats;
15use super::deferred::EXT_NAME_CACHE_CAP;
16use super::volume_io::apply_fixup;
17
18/// Sub-range fed to one parse worker — small enough to spread a 16MiB chunk
19/// across cores, large enough to amortize the per-task overhead.
20const PARSE_SUB: usize = 1 << 20;
21
22const FILE_ATTRIBUTE_HIDDEN: u32 = 0x2;
23const FILE_ATTRIBUTE_SYSTEM: u32 = 0x4;
24const FILE_ATTRIBUTE_REPARSE_POINT: u32 = 0x400;
25
26/// Fixed-size record store for the deferred/extension caches: records live
27/// back-to-back in one growable allocation, addressed by slot (ADR-0012).
28pub(super) struct RecordArena {
29    data: Vec<u8>,
30    record_size: usize,
31}
32
33impl RecordArena {
34    pub(super) const fn new(record_size: usize) -> Self {
35        Self {
36            data: Vec::new(),
37            record_size,
38        }
39    }
40
41    fn push(&mut self, rec: &[u8]) -> u32 {
42        debug_assert_eq!(rec.len(), self.record_size);
43        let slot = (self.data.len() / self.record_size) as u32;
44        self.data.extend_from_slice(rec);
45        slot
46    }
47
48    pub(super) fn get(&self, slot: u32) -> &[u8] {
49        let off = slot as usize * self.record_size;
50        &self.data[off..off + self.record_size]
51    }
52}
53
54/// $`STANDARD_INFORMATION` + $DATA extract shared by every parse path.
55#[derive(Default, Clone, Copy)]
56struct RecordAttrs {
57    size: u64,
58    mtime: i64,
59    is_reparse: bool,
60    is_hidden: bool,
61    is_system: bool,
62}
63
64fn extract_attrs(f: &NtfsFile) -> RecordAttrs {
65    let mut a = RecordAttrs::default();
66    f.attributes(|att| {
67        if att.header.type_id == NtfsAttributeType::StandardInformation as u32 {
68            if let Some(si) = att.as_standard_info() {
69                a.mtime = si.modification_time as i64;
70                a.is_reparse = si.file_attributes & FILE_ATTRIBUTE_REPARSE_POINT != 0;
71                a.is_hidden = si.file_attributes & FILE_ATTRIBUTE_HIDDEN != 0;
72                a.is_system = si.file_attributes & FILE_ATTRIBUTE_SYSTEM != 0;
73            }
74        } else if att.header.type_id == NtfsAttributeType::Data as u32 {
75            if att.header.is_non_resident == 0 {
76                if let Some(h) = att.resident_header() {
77                    a.size = h.value_length as u64;
78                }
79            } else if let Some(h) = att.nonresident_header() {
80                a.size = h.data_size;
81            }
82        }
83    });
84    a
85}
86
87/// One entry parsed by a worker; the name lives in its batch's pools.
88struct ParsedMeta {
89    /// Raw OS values (the parse layer stays in `u64`); wrapped into [`Frn`]
90    /// when the entry crosses into the index in `append_batches`.
91    parent_frn: u64,
92    frn: u64,
93    name_off: u32,
94    name_len: u32,
95    is_dir: bool,
96    attrs: RecordAttrs,
97}
98
99/// One worker's output for a record sub-range, in record order.
100#[derive(Default)]
101pub(super) struct ParsedBatch {
102    metas: Vec<ParsedMeta>,
103    name_pool: Vec<u8>,
104    lower_pool: Vec<u8>,
105    /// Raw record bytes referenced by `deferred`/`extensions` — one pool
106    /// per batch instead of a box per record (the global `RecordArena` gets
107    /// them at append time).
108    rec_pool: Vec<u8>,
109    deferred: Vec<(u64, std::ops::Range<usize>)>,
110    /// Extension records carrying a $`FILE_NAME` — the targets the deferred
111    /// pass will need. Keeping them now turns that pass's random disk reads
112    /// into RAM lookups (the whole $MFT just streamed through here anyway).
113    extensions: Vec<(u64, std::ops::Range<usize>)>,
114    files: u64,
115    dirs: u64,
116    corrupt_records: u64,
117    extension_records: u64,
118    skipped_no_name: u64,
119    pub(super) deferred_unresolved: u64,
120    /// Deferred-pass disk reads that failed (`LazyRecordReader`) — folded
121    /// into `ScanStats::deferred_name_read_failures` at append time.
122    pub(super) deferred_name_read_failures: u64,
123}
124
125impl ParsedBatch {
126    fn push_record(&mut self, bytes: &[u8]) -> std::ops::Range<usize> {
127        let start = self.rec_pool.len();
128        self.rec_pool.extend_from_slice(bytes);
129        start..self.rec_pool.len()
130    }
131
132    /// Encode a named record into this batch (WTF-8 pair + meta).
133    pub(super) fn push_named(&mut self, f: &NtfsFile, name: &NtfsFileName) {
134        let name_data = name.data;
135        let units = name.header.name_length as usize;
136        let a = extract_attrs(f);
137        if f.is_directory() {
138            self.dirs += 1;
139        } else {
140            self.files += 1;
141        }
142        let name_off = self.name_pool.len() as u32;
143        wtf8::push_wtf8_pair(
144            &name_data[..units],
145            &mut self.name_pool,
146            &mut self.lower_pool,
147        );
148        self.metas.push(ParsedMeta {
149            parent_frn: name.header.parent_directory_reference,
150            frn: f.reference_number(),
151            name_off,
152            name_len: self.name_pool.len() as u32 - name_off,
153            is_dir: f.is_directory(),
154            attrs: a,
155        });
156    }
157}
158
159/// Validate, fix up and parse every record in `bytes` (a record-aligned
160/// slice whose first byte sits at `first_logical` in the $MFT stream).
161/// Mirrors the sequential loop exactly — same skip conditions, same counts.
162fn parse_subrange(bytes: &mut [u8], first_logical: u64, record_size: usize) -> ParsedBatch {
163    let mut out = ParsedBatch::default();
164    for off in (0..bytes.len()).step_by(record_size) {
165        let number = (first_logical + off as u64) / record_size as u64;
166        if number < FIRST_NORMAL_RECORD {
167            continue; // metafiles; the builder seeds the root itself
168        }
169        let rec = &mut bytes[off..off + record_size];
170        if !NtfsFile::is_valid(rec) {
171            continue;
172        }
173        if !apply_fixup(rec) {
174            out.corrupt_records += 1;
175            continue;
176        }
177        let f = NtfsFile::new(number, rec);
178        if !f.is_used() {
179            continue;
180        }
181        if { f.header.base_reference } & 0x0000_FFFF_FFFF_FFFF != 0 {
182            out.extension_records += 1;
183            if f.get_attribute(NtfsAttributeType::FileName).is_some() {
184                let range = out.push_record(rec);
185                out.extensions.push((number, range));
186            }
187            continue;
188        }
189
190        let Some(name) = pick_name(&f) else {
191            if f.get_attribute(NtfsAttributeType::AttributeList).is_some() {
192                let range = out.push_record(rec);
193                out.deferred.push((number, range));
194            } else {
195                out.skipped_no_name += 1;
196            }
197            continue;
198        };
199        out.push_named(&f, &name);
200    }
201    out
202}
203
204/// Fan a chunk's record sub-ranges across rayon workers. The returned
205/// batches are in sub-range order, so appending them sequentially yields
206/// the same `EntryId` assignment as a fully sequential parse.
207pub(super) fn parse_chunk(
208    chunk: &mut [u8],
209    chunk_logical: u64,
210    record_size: usize,
211) -> Vec<ParsedBatch> {
212    use rayon::prelude::*;
213    let sub = (PARSE_SUB / record_size * record_size).max(record_size);
214    chunk
215        .par_chunks_mut(sub)
216        .enumerate()
217        .map(|(i, bytes)| parse_subrange(bytes, chunk_logical + (i * sub) as u64, record_size))
218        .collect()
219}
220
221pub(super) fn append_batches(
222    b: &mut VolumeIndexBuilder,
223    stats: &mut ScanStats,
224    deferred: &mut Vec<(u64, u32)>,
225    extensions: &mut FxHashMap<u64, u32>,
226    arena: &mut RecordArena,
227    batches: Vec<ParsedBatch>,
228) {
229    for batch in batches {
230        for (number, range) in batch.extensions {
231            if extensions.len() < EXT_NAME_CACHE_CAP {
232                extensions.insert(number, arena.push(&batch.rec_pool[range]));
233            } else {
234                stats.ext_name_cache_skipped += 1;
235            }
236        }
237        for m in &batch.metas {
238            let range = m.name_off as usize..(m.name_off + m.name_len) as usize;
239            b.push_encoded(EncodedEntry {
240                parent_frn: Frn(m.parent_frn),
241                frn: Frn(m.frn),
242                name_wtf8: &batch.name_pool[range.clone()],
243                lower_wtf8: &batch.lower_pool[range],
244                is_dir: m.is_dir,
245                is_reparse: m.attrs.is_reparse,
246                is_hidden: m.attrs.is_hidden,
247                is_system: m.attrs.is_system,
248                size: m.attrs.size,
249                mtime: m.attrs.mtime,
250            });
251        }
252        stats.files += batch.files;
253        stats.dirs += batch.dirs;
254        stats.corrupt_records += batch.corrupt_records;
255        stats.extension_records += batch.extension_records;
256        stats.skipped_no_name += batch.skipped_no_name;
257        stats.deferred_unresolved += batch.deferred_unresolved;
258        stats.deferred_name_read_failures += batch.deferred_name_read_failures;
259        for (number, range) in batch.deferred {
260            deferred.push((number, arena.push(&batch.rec_pool[range])));
261        }
262    }
263}