Skip to main content

fmf_core\usn/
apply.rs

1//! Reduce a journal batch to per-FRN final operations and apply them to the
2//! index.
3//!
4//! Reason flags are aggregated per FRN first (a rename storm touching one
5//! file collapses to a single upsert — docs/ARCHITECTURE.md), then ops run in
6//! first-touch order so that `mkdir a; touch a\b` resolves parents.
7
8use rustc_hash::FxHashMap;
9
10use super::records::{UsnRecord, reason};
11use crate::index::{Frn, RawEntry, RecordNo, VolumeIndex};
12
13/// Size/mtime lookup for created/changed files. The USN record carries
14/// neither (RESEARCH.md), so the live session asks the volume; replay tests
15/// inject canned values.
16pub trait StatFetcher {
17    /// Look up the current size (bytes) and mtime for an FRN, or `None` when
18    /// the file is already gone or the fetcher has no answer.
19    fn stat(&self, frn: u64) -> Option<(u64, i64)>;
20}
21
22/// Fetcher that never answers — entries keep carried-over (or zero) values.
23pub struct NullStatFetcher;
24impl StatFetcher for NullStatFetcher {
25    fn stat(&self, _frn: u64) -> Option<(u64, i64)> {
26        None
27    }
28}
29
30/// Outcome tally for one applied journal batch, one counter per op kind.
31#[derive(Debug, Default, PartialEq, Eq)]
32pub struct BatchStats {
33    /// Files/dirs created or renamed (upserted or moved in place).
34    pub created_or_renamed: u32,
35    /// Entries removed from the index because they were tombstoned.
36    pub deleted: u32,
37    /// Existing entries whose size/mtime were refreshed.
38    pub stat_updated: u32,
39    /// Records that resolved to no index change (e.g. delete of an entry that
40    /// was never present, or a stat with no fetchable value).
41    pub ignored: u32,
42    /// Volume lookups (size/mtime) that came back empty — usually the file
43    /// vanished before we could stat it; floods indicate a real problem.
44    pub stat_failures: u32,
45}
46
47struct Agg {
48    reasons: u32,
49    /// Index into the batch of the latest record for this FRN (carries the
50    /// final name/parent/attributes).
51    last: usize,
52}
53
54const STAT_REASONS: u32 = reason::DATA_OVERWRITE
55    | reason::DATA_EXTEND
56    | reason::DATA_TRUNCATION
57    | reason::BASIC_INFO_CHANGE;
58
59/// Apply one journal batch. Bumps the content generation exactly once.
60pub fn apply_batch(
61    idx: &mut VolumeIndex,
62    records: &[UsnRecord],
63    fetch: &dyn StatFetcher,
64) -> BatchStats {
65    let mut stats = BatchStats::default();
66    let mut order: Vec<RecordNo> = Vec::new();
67    let mut agg: FxHashMap<RecordNo, Agg> = FxHashMap::default();
68
69    for (i, r) in records.iter().enumerate() {
70        let key = Frn(r.frn).record();
71        match agg.entry(key) {
72            std::collections::hash_map::Entry::Occupied(mut e) => {
73                let a = e.get_mut();
74                a.reasons |= r.reason;
75                a.last = i;
76            }
77            std::collections::hash_map::Entry::Vacant(v) => {
78                v.insert(Agg {
79                    reasons: r.reason,
80                    last: i,
81                });
82                order.push(key);
83            }
84        }
85    }
86
87    let first_new = idx.len() as u32;
88    for key in order {
89        let a = &agg[&key];
90        let last = &records[a.last];
91
92        if a.reasons & reason::FILE_DELETE != 0 {
93            if idx.delete(key).is_some() {
94                stats.deleted += 1;
95            } else {
96                stats.ignored += 1;
97            }
98        } else if a.reasons & (reason::FILE_CREATE | reason::RENAME_NEW_NAME) != 0 {
99            // Directory rename/move must keep the EntryId stable (children
100            // point at it) — handled in place. Files go tombstone+new.
101            let existing = idx.entry_by_record(key);
102            if let Some(old) = existing
103                && idx.is_dir(old)
104                && last.is_dir()
105            {
106                idx.rename_dir_in_place(key, &last.name, Frn(last.parent_frn).record());
107                stats.created_or_renamed += 1;
108                continue;
109            }
110            // Carry size/mtime over from the previous entry when the volume
111            // can't answer (file already gone, or replay without fixtures).
112            let fetched = fetch.stat(last.frn);
113            if fetched.is_none() {
114                stats.stat_failures += 1;
115            }
116            let carried = existing.map(|id| (idx.size(id), idx.mtime(id)));
117            let (size, mtime) = fetched.or(carried).unwrap_or((0, 0));
118            idx.upsert(&RawEntry {
119                parent_frn: Frn(last.parent_frn),
120                frn: Frn(last.frn),
121                name_utf16: &last.name,
122                is_dir: last.is_dir(),
123                is_reparse: last.is_reparse(),
124                is_hidden: last.is_hidden(),
125                is_system: last.is_system(),
126                size,
127                mtime,
128            });
129            stats.created_or_renamed += 1;
130        } else if a.reasons & STAT_REASONS != 0 {
131            // BASIC_INFO_CHANGE may have flipped hidden/system attributes.
132            idx.update_attrs(key, last.is_hidden(), last.is_system());
133            if let Some((size, mtime)) = fetch.stat(last.frn) {
134                if idx.update_stat(key, size, mtime).is_some() {
135                    stats.stat_updated += 1;
136                } else {
137                    stats.ignored += 1;
138                }
139            } else {
140                stats.stat_failures += 1;
141                stats.ignored += 1;
142            }
143        } else {
144            stats.ignored += 1;
145        }
146    }
147
148    idx.merge_new_into_permutations(first_new);
149    stats
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155    use crate::index::VolumeIndexBuilder;
156
157    fn rec(frn: u64, parent: u64, reason: u32, attrs: u32, name: &str) -> UsnRecord {
158        UsnRecord {
159            usn: 0,
160            frn,
161            parent_frn: parent,
162            reason,
163            attributes: attrs,
164            name: name.encode_utf16().collect(),
165        }
166    }
167
168    fn base_index() -> VolumeIndex {
169        let mut b = VolumeIndexBuilder::new("C:", 5);
170        let docs: Vec<u16> = "docs".encode_utf16().collect();
171        let note: Vec<u16> = "note.txt".encode_utf16().collect();
172        b.push(RawEntry {
173            parent_frn: Frn(5),
174            frn: Frn((1 << 48) | 0x0A),
175            name_utf16: &docs,
176            is_dir: true,
177            is_reparse: false,
178            is_hidden: false,
179            is_system: false,
180            size: 0,
181            mtime: 0,
182        });
183        b.push(RawEntry {
184            parent_frn: Frn(10),
185            frn: Frn((1 << 48) | 0x0B),
186            name_utf16: &note,
187            is_dir: false,
188            is_reparse: false,
189            is_hidden: false,
190            is_system: false,
191            size: 100,
192            mtime: 7,
193        });
194        b.finish()
195    }
196
197    fn path_of(idx: &VolumeIndex, record: u64) -> String {
198        let id = idx.entry_by_record(record).unwrap();
199        let mut p = Vec::new();
200        idx.append_path(id, &mut p);
201        String::from_utf8(p).unwrap()
202    }
203
204    struct Fixed(u64, i64);
205    impl StatFetcher for Fixed {
206        fn stat(&self, _frn: u64) -> Option<(u64, i64)> {
207            Some((self.0, self.1))
208        }
209    }
210
211    #[test]
212    fn create_in_new_dir_within_one_batch() {
213        let mut idx = base_index();
214        let batch = [
215            rec(20, 5, reason::FILE_CREATE | reason::CLOSE, 0x10, "src"),
216            rec(21, 20, reason::FILE_CREATE | reason::CLOSE, 0x20, "main.rs"),
217        ];
218        let s = apply_batch(&mut idx, &batch, &Fixed(42, 9));
219        assert_eq!(s.created_or_renamed, 2);
220        assert_eq!(path_of(&idx, 21), r"C:\src\main.rs");
221        let id = idx.entry_by_record(21).unwrap();
222        assert_eq!((idx.size(id), idx.mtime(id)), (42, 9));
223    }
224
225    #[test]
226    fn rename_storm_collapses_to_final_name() {
227        let mut idx = base_index();
228        let batch = [
229            rec(11, 10, reason::RENAME_OLD_NAME, 0x20, "note.txt"),
230            rec(11, 10, reason::RENAME_NEW_NAME, 0x20, "tmp1.txt"),
231            rec(11, 10, reason::RENAME_OLD_NAME, 0x20, "tmp1.txt"),
232            rec(
233                11,
234                10,
235                reason::RENAME_NEW_NAME | reason::CLOSE,
236                0x20,
237                "final.txt",
238            ),
239        ];
240        let s = apply_batch(&mut idx, &batch, &NullStatFetcher);
241        assert_eq!(s.created_or_renamed, 1);
242        assert_eq!(path_of(&idx, 11), r"C:\docs\final.txt");
243        // Carried over size/mtime survive a rename without a fetcher.
244        let id = idx.entry_by_record(11).unwrap();
245        assert_eq!((idx.size(id), idx.mtime(id)), (100, 7));
246    }
247
248    #[test]
249    fn move_to_other_dir_updates_child_paths() {
250        let mut idx = base_index();
251        let batch = [
252            rec(20, 5, reason::FILE_CREATE | reason::CLOSE, 0x10, "archive"),
253            rec(
254                10,
255                20,
256                reason::RENAME_NEW_NAME | reason::CLOSE,
257                0x10,
258                "docs",
259            ),
260        ];
261        apply_batch(&mut idx, &batch, &NullStatFetcher);
262        // docs moved under archive; note.txt's lazy path follows.
263        assert_eq!(path_of(&idx, 11), r"C:\archive\docs\note.txt");
264    }
265
266    #[test]
267    fn create_then_delete_in_one_batch_is_a_delete() {
268        let mut idx = base_index();
269        let n = idx.live_len();
270        let batch = [
271            rec(30, 5, reason::FILE_CREATE, 0x20, "ghost.tmp"),
272            rec(
273                30,
274                5,
275                reason::FILE_DELETE | reason::CLOSE,
276                0x20,
277                "ghost.tmp",
278            ),
279        ];
280        let s = apply_batch(&mut idx, &batch, &NullStatFetcher);
281        assert_eq!(s.deleted, 0); // never existed in the index
282        assert_eq!(s.ignored, 1);
283        assert_eq!(idx.live_len(), n);
284    }
285
286    #[test]
287    fn stat_update_changes_size_and_mtime() {
288        let mut idx = base_index();
289        let batch = [rec(
290            11,
291            10,
292            reason::DATA_EXTEND | reason::CLOSE,
293            0x20,
294            "note.txt",
295        )];
296        let s = apply_batch(&mut idx, &batch, &Fixed(5000, 99));
297        assert_eq!(s.stat_updated, 1);
298        let id = idx.entry_by_record(11).unwrap();
299        assert_eq!((idx.size(id), idx.mtime(id)), (5000, 99));
300    }
301
302    #[test]
303    fn delete_removes_from_results_and_generation_bumps() {
304        let mut idx = base_index();
305        let g0 = idx.content_generation();
306        let batch = [rec(
307            11,
308            10,
309            reason::FILE_DELETE | reason::CLOSE,
310            0x20,
311            "note.txt",
312        )];
313        let s = apply_batch(&mut idx, &batch, &NullStatFetcher);
314        assert_eq!(s.deleted, 1);
315        assert!(idx.entry_by_record(11).is_none());
316        assert_eq!(idx.content_generation(), g0 + 1);
317    }
318
319    #[test]
320    fn renamed_entry_lands_sorted_in_permutation() {
321        let mut idx = base_index();
322        let batch = [rec(
323            11,
324            10,
325            reason::RENAME_NEW_NAME | reason::CLOSE,
326            0x20,
327            "aaa_first.txt",
328        )];
329        apply_batch(&mut idx, &batch, &NullStatFetcher);
330        let perm = idx.name_permutation();
331        let live: Vec<&[u8]> = perm
332            .iter()
333            .filter(|&&id| idx.is_live(id))
334            .map(|&id| idx.lower_name(id))
335            .collect();
336        let mut sorted = live.clone();
337        sorted.sort();
338        assert_eq!(live, sorted);
339    }
340}