Skip to main content

fmf_core\index/
frn.rs

1//! Record-number → `EntryId` lookup as a sorted permutation (ADR-0005): one
2//! id array maintained merge-only, exactly like the sort permutations.
3//! Appends collect in an unmerged tail that lookups scan linearly, and the
4//! end-of-batch merge folds them in sorted order. The keys are the `frn`
5//! column itself, read through the id indirection — a probe pays one extra
6//! cache miss, which only the USN path and the builder's parent resolution
7//! ever do (never a search).
8//!
9//! Deletions never touch the array: a tombstoned entry simply fails the
10//! liveness filter at lookup time. Renames (tombstone + append, same
11//! record) and NTFS record reuse therefore leave several ids per key, of
12//! which at most one is live — the invariant the mutation API upholds.
13
14use rayon::prelude::*;
15
16use super::{EntryId, Frn, RecordNo, flags};
17
18#[derive(Default)]
19pub(super) struct FrnIndex {
20    /// `EntryIds` ordered by (masked record number, id). Appended entries
21    /// always carry the largest ids, so equal keys read old-before-new.
22    ids: Vec<EntryId>,
23    /// Entries `[0, covers)` are represented; later ids are found by the
24    /// tail scan until [`Self::merge_appended`] runs (end of USN batch).
25    covers: u32,
26}
27
28#[inline]
29fn is_live(flag: &[u8], id: EntryId) -> bool {
30    flag[id as usize] & flags::TOMBSTONE == 0
31}
32
33impl FrnIndex {
34    /// Build from scratch over every live entry (initial scan finish,
35    /// snapshot restore).
36    pub(super) fn build(frn: &[u64], flag: &[u8]) -> Self {
37        let mut pairs: Vec<(RecordNo, EntryId)> = (0..frn.len() as u32)
38            .filter(|&id| is_live(flag, id))
39            .map(|id| (Frn(frn[id as usize]).record(), id))
40            .collect();
41        pairs.par_sort_unstable();
42        Self {
43            ids: pairs.iter().map(|p| p.1).collect(),
44            covers: frn.len() as u32,
45        }
46    }
47
48    /// The live entry for `key` (a record number), if any.
49    pub(super) fn lookup(&self, key: RecordNo, frn: &[u64], flag: &[u8]) -> Option<EntryId> {
50        // Unmerged tail first, newest first: within a batch the latest
51        // upsert for a record is the live one.
52        for id in (self.covers..frn.len() as u32).rev() {
53            if Frn(frn[id as usize]).record() == key && is_live(flag, id) {
54                return Some(id);
55            }
56        }
57        let key_of = |id: EntryId| Frn(frn[id as usize]).record();
58        let start = self.ids.partition_point(|&id| key_of(id) < key);
59        for &id in &self.ids[start..] {
60            if key_of(id) != key {
61                break;
62            }
63            if is_live(flag, id) {
64                return Some(id);
65            }
66        }
67        None
68    }
69
70    /// Fold the appended entries into sorted order — live ones only;
71    /// anything tombstoned before its first merge can never be looked up
72    /// again. In place: each batch pair binary-searches its insertion point
73    /// and the segments between insertion points move once (`copy_within`)
74    /// — O(batch·log n) comparisons, no reallocation (ADR-0008).
75    /// Equal keys keep old-before-new order; liveness never depends on
76    /// that order anyway (at most one live pair per key).
77    pub(super) fn merge_appended(&mut self, frn: &[u64], flag: &[u8]) {
78        let n = frn.len() as u32;
79        let mut batch: Vec<(RecordNo, EntryId)> = (self.covers..n)
80            .filter(|&id| is_live(flag, id))
81            .map(|id| (Frn(frn[id as usize]).record(), id))
82            .collect();
83        self.covers = n;
84        if batch.is_empty() {
85            return;
86        }
87        batch.sort_unstable();
88
89        let old = self.ids.len();
90        super::reserve_bounded(&mut self.ids, batch.len());
91        self.ids.resize(old + batch.len(), 0);
92        let key_of = |id: EntryId| Frn(frn[id as usize]).record();
93        let mut hi = old; // unmoved prefix of the old array (exclusive end)
94        let mut k = old + batch.len(); // write cursor (exclusive end)
95        for j in (0..batch.len()).rev() {
96            let (key, id) = batch[j];
97            let pos = self.ids[..hi].partition_point(|&oid| key_of(oid) <= key);
98            let seg = hi - pos;
99            self.ids.copy_within(pos..hi, k - seg);
100            k -= seg + 1;
101            self.ids[k] = id;
102            hi = pos;
103        }
104        debug_assert_eq!(k, hi, "merge cursors must close");
105    }
106
107    /// Remapped copy for compaction: dead ids drop out, survivors renumber.
108    /// Keys (masked record numbers) are copied unchanged by the caller and
109    /// the remap preserves relative id order, so the (key, id) order — and
110    /// with it lookup's binary search — survives without a re-sort.
111    pub(super) fn compact(&self, remap: &[EntryId], new_len: u32) -> Self {
112        debug_assert_eq!(
113            self.covers as usize,
114            remap.len(),
115            "compact at a batch boundary only (unmerged tail would be lost)"
116        );
117        Self {
118            ids: self
119                .ids
120                .iter()
121                .filter_map(|&id| match remap[id as usize] {
122                    super::NO_PARENT => None,
123                    new_id => Some(new_id),
124                })
125                .collect(),
126            covers: new_len,
127        }
128    }
129
130    pub(super) const fn bytes(&self) -> u64 {
131        (self.ids.capacity() * 4) as u64
132    }
133
134    pub(super) fn shrink_to_fit(&mut self) {
135        self.ids.shrink_to_fit();
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::EntryId;
142    use crate::index::RecordNo;
143    use crate::index::testutil::{build_sample, raw, u16s};
144
145    /// Reference implementation (forward full merge): equal keys take the
146    /// old pair first.
147    fn forward_merge_reference(
148        keys: &mut Vec<RecordNo>,
149        ids: &mut Vec<EntryId>,
150        batch: &[(RecordNo, EntryId)],
151    ) {
152        let mut nk = Vec::with_capacity(keys.len() + batch.len());
153        let mut ni = Vec::with_capacity(ids.len() + batch.len());
154        let (mut i, mut j) = (0, 0);
155        while i < keys.len() && j < batch.len() {
156            if keys[i] <= batch[j].0 {
157                nk.push(keys[i]);
158                ni.push(ids[i]);
159                i += 1;
160            } else {
161                nk.push(batch[j].0);
162                ni.push(batch[j].1);
163                j += 1;
164            }
165        }
166        nk.extend_from_slice(&keys[i..]);
167        ni.extend_from_slice(&ids[i..]);
168        for &(k, id) in &batch[j..] {
169            nk.push(k);
170            ni.push(id);
171        }
172        *keys = nk;
173        *ids = ni;
174    }
175
176    /// Random rename/delete storms: the in-place merge must produce the
177    /// byte-identical id array the forward reference does, batch after
178    /// batch (record keys are never mutated in place, so both orders stay
179    /// truly sorted and the sorted stable merge is unique). The reference
180    /// carries explicit key/id pairs — the production array dropped the
181    /// key copy and reads keys through the id indirection, so the derived
182    /// key sequence is asserted too.
183    #[test]
184    fn in_place_merge_is_byte_identical_to_the_forward_reference() {
185        let mut idx = build_sample();
186        let mut ref_ids = idx.frn_index.ids.clone();
187        let mut ref_keys: Vec<RecordNo> = ref_ids.iter().map(|&id| idx.frn(id).record()).collect();
188        let mut state = 0x1234_5678_9ABC_DEF0u64;
189        let mut rng = move || {
190            state ^= state << 13;
191            state ^= state >> 7;
192            state ^= state << 17;
193            state
194        };
195
196        for _ in 0..100 {
197            let first_new = idx.len() as u32;
198            for _ in 0..=(rng() % 8) {
199                let record = 100 + rng() % 40;
200                if rng() % 3 < 2 {
201                    let name = u16s(&format!("f{}.txt", rng() % 1000));
202                    idx.upsert(&raw(record, 50, &name, false, 1, 1));
203                } else {
204                    idx.delete(record);
205                }
206            }
207            // Mirror merge_appended's batch input exactly (live tail only).
208            let mut batch: Vec<(RecordNo, EntryId)> = (first_new..idx.len() as u32)
209                .filter(|&id| idx.is_live(id))
210                .map(|id| (idx.frn(id).record(), id))
211                .collect();
212            batch.sort_unstable();
213            forward_merge_reference(&mut ref_keys, &mut ref_ids, &batch);
214
215            idx.merge_new_into_permutations(first_new);
216            assert_eq!(idx.frn_index.ids, ref_ids);
217            let derived_keys: Vec<RecordNo> = idx
218                .frn_index
219                .ids
220                .iter()
221                .map(|&id| idx.frn(id).record())
222                .collect();
223            assert_eq!(derived_keys, ref_keys);
224        }
225    }
226
227    /// The latest upsert of a record must win — found via the unmerged
228    /// tail during the batch and via the sorted arrays after the merge,
229    /// with identical results.
230    #[test]
231    fn rename_storm_resolves_to_the_latest_before_and_after_merge() {
232        let mut idx = build_sample();
233        let live_before = idx.live_len();
234        let first_new = idx.len() as u32;
235        let mut last = None;
236        for i in 0..5u64 {
237            let name = u16s(&format!("storm_{i}.txt"));
238            last = Some(idx.upsert(&raw(100, 50, &name, false, 1, i as i64)));
239        }
240        let tail_hit = idx.entry_by_record(100).unwrap();
241        assert_eq!(Some(tail_hit), last, "tail scan must see the newest");
242        assert_eq!(idx.name(tail_hit), b"storm_4.txt");
243
244        idx.merge_new_into_permutations(first_new);
245        assert_eq!(idx.entry_by_record(100), last, "sorted lookup agrees");
246        assert_eq!(idx.live_len(), live_before, "storm nets zero live change");
247    }
248
249    /// NTFS reuses record numbers: a delete followed by a create of the
250    /// same record must resolve to the new entry, never the tombstone.
251    #[test]
252    fn record_reuse_after_delete_resolves_to_the_new_entry() {
253        let mut idx = build_sample();
254        idx.delete(60).unwrap();
255        idx.merge_new_into_permutations(idx.len() as u32);
256        assert_eq!(idx.entry_by_record(60), None, "deleted record misses");
257
258        let first_new = idx.len() as u32;
259        let name = u16s("reborn.txt");
260        let id = idx.upsert(&raw(60, 50, &name, false, 7, 7));
261        assert_eq!(idx.entry_by_record(60), Some(id), "tail finds the rebirth");
262        idx.merge_new_into_permutations(first_new);
263        assert_eq!(idx.entry_by_record(60), Some(id), "merge keeps it");
264        assert_eq!(idx.name(id), b"reborn.txt");
265    }
266
267    /// An entry created and deleted within one batch never surfaces, and
268    /// the merge does not resurrect it.
269    #[test]
270    fn create_then_delete_within_a_batch_stays_gone() {
271        let mut idx = build_sample();
272        let first_new = idx.len() as u32;
273        let name = u16s("flash.tmp");
274        idx.upsert(&raw(777, 50, &name, false, 1, 1));
275        assert!(idx.entry_by_record(777).is_some());
276        idx.delete(777).unwrap();
277        assert_eq!(idx.entry_by_record(777), None, "gone in the tail");
278        idx.merge_new_into_permutations(first_new);
279        assert_eq!(idx.entry_by_record(777), None, "gone after the merge");
280    }
281}