Skip to main content

fmf_core\query/
memo.rs

1use rayon::prelude::*;
2
3use crate::index::{EntryId, SortKey, VolumeIndex};
4
5// ── Pool offset table (generation-cached) ───────────────────────────────
6
7/// Sorted (pool offset → entry id) table that maps pool-scan hits back to
8/// entries. `name_off` loses monotonicity after renames (new names append),
9/// so a sorted view is maintained per content generation via the index's
10/// derived-cache slot — incrementally when possible ([`Self::extend_from`]):
11/// pool appends are always at the tail, so a generation step is "copy +
12/// append", never a re-sort.
13///
14/// Pairs whose entry has since moved its name (in-place dir renames) stay in
15/// the table as *stale pairs*; the sweep detects them by comparing the
16/// pair's offset against the entry's current `name_off` and skips the
17/// region, exactly like the stale byte gaps it already handles.
18pub(super) struct OffsetTable {
19    pub(super) offs: Vec<u32>,
20    pub(super) ids: Vec<EntryId>,
21    /// Entries `[0, covers_entries)` are represented (possibly stale).
22    covers_entries: u32,
23    /// Pool bytes `[0, covers_pool_len)` are represented.
24    covers_pool_len: u32,
25    stale_pairs: u32,
26}
27
28/// Past this stale fraction the copy-and-append loses to a clean rebuild.
29const STALE_REBUILD_DIVISOR: u32 = 8;
30
31impl OffsetTable {
32    pub(super) fn build(idx: &VolumeIndex) -> Self {
33        let mut pairs: Vec<(u32, EntryId)> = (0..idx.len() as u32)
34            .map(|id| (idx.name_off_of(id), id))
35            .collect();
36        pairs.par_sort_unstable();
37        Self {
38            offs: pairs.iter().map(|p| p.0).collect(),
39            ids: pairs.iter().map(|p| p.1).collect(),
40            covers_entries: idx.len() as u32,
41            covers_pool_len: idx.lower_pool_bytes().len() as u32,
42            stale_pairs: 0,
43        }
44    }
45
46    /// Extend `prev` to the current generation without sorting: keep its
47    /// pairs (reusing the allocations when the cache held the last Arc,
48    /// copying otherwise), then append the pairs past its watermarks.
49    /// Appended entries arrive in offset order by construction; in-place
50    /// renamed dirs (their `name_off` jumped past the watermark) are merged
51    /// in and their old pairs left behind as stale.
52    pub(super) fn extend_from(idx: &VolumeIndex, prev: std::sync::Arc<Self>) -> Self {
53        let n = idx.len() as u32;
54        let pool_len = idx.lower_pool_bytes().len() as u32;
55        // Entries and pool are append-only within a structural generation —
56        // a regressed watermark means the cache got crossed with a different
57        // index. Rebuilding recovers; the fact must not vanish.
58        if prev.covers_entries > n || prev.covers_pool_len > pool_len {
59            crate::metrics::Counters::bump_offset_table_rebuild_fallbacks();
60            tracing::warn!(
61                covers_entries = prev.covers_entries,
62                entries = n,
63                covers_pool = prev.covers_pool_len,
64                pool = pool_len,
65                "offset table watermark regressed — falling back to a full rebuild"
66            );
67            return Self::build(idx);
68        }
69
70        // Appended entries: ids past the watermark; their offsets ascend
71        // with id (names append). In-place renamed dirs: old ids whose
72        // current offset jumped past the pool watermark.
73        let new_ids: Vec<EntryId> = (prev.covers_entries..n).collect();
74        let mut moved: Vec<(u32, EntryId)> = (0..prev.covers_entries)
75            .filter(|&id| idx.name_off_of(id) >= prev.covers_pool_len)
76            .map(|id| (idx.name_off_of(id), id))
77            .collect();
78        moved.sort_unstable();
79
80        let stale_pairs = prev.stale_pairs + moved.len() as u32;
81        if stale_pairs > n / STALE_REBUILD_DIVISOR {
82            tracing::debug!(
83                stale_pairs,
84                n,
85                "offset table stale fraction high — clean rebuild"
86            );
87            return Self::build(idx);
88        }
89
90        // In the common case no in-flight query still holds the previous
91        // table (the cache slot owned the only Arc) — reuse its allocations
92        // and skip the multi-MB copy entirely.
93        let added = new_ids.len() + moved.len();
94        let (mut offs, mut ids) = match std::sync::Arc::try_unwrap(prev) {
95            Ok(owned) => (owned.offs, owned.ids),
96            Err(shared) => (shared.offs.clone(), shared.ids.clone()),
97        };
98        offs.reserve(added);
99        ids.reserve(added);
100
101        // Two-pointer merge of the two off-sorted append lists.
102        let (mut i, mut j) = (0, 0);
103        while i < new_ids.len() || j < moved.len() {
104            let take_new = j >= moved.len()
105                || (i < new_ids.len() && idx.name_off_of(new_ids[i]) <= moved[j].0);
106            if take_new {
107                offs.push(idx.name_off_of(new_ids[i]));
108                ids.push(new_ids[i]);
109                i += 1;
110            } else {
111                offs.push(moved[j].0);
112                ids.push(moved[j].1);
113                j += 1;
114            }
115        }
116        debug_assert!(offs.is_sorted());
117
118        Self {
119            offs,
120            ids,
121            covers_entries: n,
122            covers_pool_len: pool_len,
123            stale_pairs,
124        }
125    }
126
127    pub(super) const fn len(&self) -> usize {
128        self.offs.len()
129    }
130}
131
132// ── Lazy sort permutations (generation-cached) ──────────────────────────
133
134/// Pre-sorted id order for one sort key, built on the first query that
135/// sorts by it and extended per content generation after that — the same
136/// insertion-point in-place merge the name permutation uses, through the
137/// same `cmp_by` order (ADR-0006).
138///
139/// Never persisted: a snapshot restore re-sorts on first use, which also
140/// resets any staleness in-place stat updates accumulated.
141#[derive(Clone)]
142pub(super) struct SortPerm {
143    pub(super) ids: Vec<EntryId>,
144    /// Entries `[0, covers)` are placed; a generation step sorts and
145    /// merges only the ids past the watermark.
146    covers: u32,
147}
148
149/// Size order — its own derived-cache slot (TypeId-keyed).
150pub(super) struct SizePerm(pub(super) SortPerm);
151/// Mtime order — separate slot.
152pub(super) struct MtimePerm(pub(super) SortPerm);
153
154impl SizePerm {
155    pub(super) fn get(idx: &VolumeIndex) -> std::sync::Arc<Self> {
156        idx.cached_derived_or_update(|prev| match prev {
157            Some(p) => Self(SortPerm::extend(
158                idx,
159                take_perm(p, |m: &Self| &m.0),
160                SortKey::Size,
161            )),
162            None => Self(SortPerm::build(idx, SortKey::Size)),
163        })
164    }
165}
166
167impl MtimePerm {
168    pub(super) fn get(idx: &VolumeIndex) -> std::sync::Arc<Self> {
169        idx.cached_derived_or_update(|prev| match prev {
170            Some(p) => Self(SortPerm::extend(
171                idx,
172                take_perm(p, |m: &Self| &m.0),
173                SortKey::Mtime,
174            )),
175            None => Self(SortPerm::build(idx, SortKey::Mtime)),
176        })
177    }
178}
179
180/// Reuse the previous permutation's allocation when the cache slot held
181/// the only Arc, clone otherwise (same policy as the other derived caches).
182fn take_perm<T>(prev: std::sync::Arc<T>, perm_of: impl Fn(&T) -> &SortPerm) -> SortPerm
183where
184    SortPerm: From<T>,
185{
186    match std::sync::Arc::try_unwrap(prev) {
187        Ok(owned) => owned.into(),
188        Err(shared) => perm_of(&shared).clone(),
189    }
190}
191
192impl From<SizePerm> for SortPerm {
193    fn from(p: SizePerm) -> Self {
194        p.0
195    }
196}
197impl From<MtimePerm> for SortPerm {
198    fn from(p: MtimePerm) -> Self {
199        p.0
200    }
201}
202
203impl SortPerm {
204    fn build(idx: &VolumeIndex, key: SortKey) -> Self {
205        let mut ids: Vec<EntryId> = (0..idx.len() as u32).collect();
206        ids.par_sort_unstable_by(|&a, &b| idx.cmp_by(key, a, b));
207        Self {
208            ids,
209            covers: idx.len() as u32,
210        }
211    }
212
213    fn extend(idx: &VolumeIndex, mut perm: Self, key: SortKey) -> Self {
214        let n = idx.len() as u32;
215        // Entries are append-only within a structural generation — a
216        // regressed watermark means the cache got crossed with a different
217        // index. Rebuilding recovers; the fact must not vanish.
218        if perm.covers > n {
219            crate::metrics::Counters::bump_lazy_perm_rebuild_fallbacks();
220            tracing::warn!(
221                covers = perm.covers,
222                entries = n,
223                "lazy sort permutation watermark regressed — falling back to a full rebuild"
224            );
225            return Self::build(idx, key);
226        }
227        let mut batch: Vec<EntryId> = (perm.covers..n).collect();
228        batch.sort_unstable_by(|&a, &b| idx.cmp_by(key, a, b));
229        crate::index::merge_sorted_tail(&mut perm.ids, &batch, |a, b| idx.cmp_by(key, a, b));
230        perm.covers = n;
231        perm
232    }
233}
234
235// ── Dir-path memo (generation-cached, one per name pool) ────────────────
236
237/// Memoized full paths for every directory, for one name pool. Only built
238/// when the query contains path terms — and split per pool so a query only
239/// pays for the pool(s) it reads (nearly all path queries are folded).
240/// Entry paths are `memo[parent] + name`.
241///
242/// Across generations a memo extends incrementally as long as no existing
243/// directory was renamed or moved (`dir_topology_generation`): appends
244/// never change old dir paths, so new dirs just memoize on top. A topology
245/// change rebuilds from scratch — dir renames are rare and the alternative
246/// (subtree invalidation) is not worth its complexity.
247pub(super) struct DirPathsPool {
248    paths: Vec<Option<Box<[u8]>>>,
249    /// Entries `[0, covers_entries)` are memoized.
250    covers_entries: usize,
251    /// The dir-topology generation this memo is valid for.
252    topo_generation: u64,
253}
254
255/// Folded-name memo — its own derived-cache slot (TypeId-keyed).
256pub(super) struct DirPathsLower(DirPathsPool);
257/// Original-name memo — separate slot, built only by orig-case path terms.
258pub(super) struct DirPathsOrig(DirPathsPool);
259
260impl DirPathsLower {
261    pub(super) fn build(idx: &VolumeIndex) -> Self {
262        Self(DirPathsPool::build(idx, true))
263    }
264
265    pub(super) fn extend_from(idx: &VolumeIndex, prev: std::sync::Arc<Self>) -> Self {
266        Self(DirPathsPool::extend(idx, take_pool(prev, |m| &m.0), true))
267    }
268}
269
270impl DirPathsOrig {
271    pub(super) fn build(idx: &VolumeIndex) -> Self {
272        Self(DirPathsPool::build(idx, false))
273    }
274
275    pub(super) fn extend_from(idx: &VolumeIndex, prev: std::sync::Arc<Self>) -> Self {
276        Self(DirPathsPool::extend(idx, take_pool(prev, |m| &m.0), false))
277    }
278}
279
280/// Reuse the previous memo's allocations when the cache slot held the only
281/// Arc (the common case — no in-flight query still reads it), clone
282/// otherwise.
283fn take_pool<T>(prev: std::sync::Arc<T>, pool_of: impl Fn(&T) -> &DirPathsPool) -> DirPathsPool
284where
285    DirPathsPool: From<T>,
286{
287    match std::sync::Arc::try_unwrap(prev) {
288        Ok(owned) => owned.into(),
289        Err(shared) => {
290            let p = pool_of(&shared);
291            DirPathsPool {
292                paths: p.paths.clone(),
293                covers_entries: p.covers_entries,
294                topo_generation: p.topo_generation,
295            }
296        }
297    }
298}
299
300impl From<DirPathsLower> for DirPathsPool {
301    fn from(m: DirPathsLower) -> Self {
302        m.0
303    }
304}
305impl From<DirPathsOrig> for DirPathsPool {
306    fn from(m: DirPathsOrig) -> Self {
307        m.0
308    }
309}
310
311impl DirPathsPool {
312    #[inline]
313    fn name_of(idx: &VolumeIndex, id: EntryId, folded: bool) -> &[u8] {
314        if folded {
315            idx.lower_name(id)
316        } else {
317            idx.name(id)
318        }
319    }
320
321    /// Extend `pool` to the current generation: memoize only the appended
322    /// dirs. Their parents were live entries when pushed (lower id or
323    /// root), so one increasing-id pass resolves every prefix. Falls back
324    /// to a full build when a dir was renamed/moved (paths of arbitrary
325    /// descendants changed) — the normal policy switch, not a degradation.
326    fn extend(idx: &VolumeIndex, mut pool: Self, folded: bool) -> Self {
327        let n = idx.len();
328        if pool.topo_generation != idx.dir_topology_generation() || pool.covers_entries > n {
329            return Self::build(idx, folded);
330        }
331        pool.paths.resize(n, None);
332        for id in pool.covers_entries as u32..n as u32 {
333            if !idx.is_dir(id) {
334                continue;
335            }
336            let p = idx.parent(id) as usize;
337            let mut path = pool
338                .paths
339                .get(p)
340                .and_then(|x| x.as_deref())
341                .unwrap_or(&[])
342                .to_vec();
343            path.extend_from_slice(Self::name_of(idx, id, folded));
344            path.push(b'\\');
345            pool.paths[id as usize] = Some(path.into_boxed_slice());
346        }
347        pool.covers_entries = n;
348        pool
349    }
350
351    /// Level-order parallel build: a directory's path depends only on its
352    /// parent's (one level up), so each depth level fans out across cores.
353    fn build(idx: &VolumeIndex, folded: bool) -> Self {
354        let n = idx.len();
355        let mut memo = Self {
356            paths: vec![None; n],
357            covers_entries: n,
358            topo_generation: idx.dir_topology_generation(),
359        };
360
361        // Depth per directory via memoized chain walks (serial, O(n)).
362        let mut depth: Vec<u32> = vec![u32::MAX; n];
363        let mut stack: Vec<EntryId> = Vec::new();
364        let mut max_depth = 0u32;
365        let mut cycle_members = 0u64;
366        for id in 0..n as u32 {
367            if !idx.is_dir(id) {
368                continue;
369            }
370            let mut cur = id;
371            stack.clear();
372            while depth[cur as usize] == u32::MAX {
373                stack.push(cur);
374                if cur == VolumeIndex::ROOT {
375                    break;
376                }
377                cur = idx.parent(cur);
378                if stack.len() > 4096 {
379                    break; // corrupt parent cycle — treat as root-attached
380                }
381            }
382            while let Some(d) = stack.pop() {
383                let v = if d == VolumeIndex::ROOT {
384                    0
385                } else {
386                    match depth[idx.parent(d) as usize] {
387                        // Unresolved parent: `d` sits on a corrupt parent
388                        // cycle (or a >4096 chain). Attach it at the root —
389                        // u32::MAX must not propagate into max_depth, which
390                        // sizes the level table.
391                        u32::MAX => {
392                            cycle_members += 1;
393                            1
394                        }
395                        pd => pd + 1,
396                    }
397                };
398                depth[d as usize] = v;
399                max_depth = max_depth.max(v);
400            }
401        }
402        if cycle_members > 0 {
403            // No MetricsHub at this layer; the WARN lands in the diag ring
404            // (F12 panel + engine-error event), so the degradation is loud.
405            tracing::warn!(
406                cycle_members,
407                "corrupt parent chain while building dir paths — affected dirs attached at root"
408            );
409        }
410
411        let mut levels: Vec<Vec<EntryId>> = vec![Vec::new(); max_depth as usize + 1];
412        for id in 0..n as u32 {
413            if idx.is_dir(id) && depth[id as usize] != u32::MAX {
414                levels[depth[id as usize] as usize].push(id);
415            }
416        }
417
418        for level in levels {
419            let built: Vec<(EntryId, Box<[u8]>)> = level
420                .into_par_iter()
421                .map(|d| {
422                    let mut path = if d == VolumeIndex::ROOT {
423                        Vec::new()
424                    } else {
425                        let p = idx.parent(d) as usize;
426                        memo.paths[p].as_deref().unwrap_or(&[]).to_vec()
427                    };
428                    path.extend_from_slice(Self::name_of(idx, d, folded));
429                    path.push(b'\\');
430                    (d, path.into_boxed_slice())
431                })
432                .collect();
433            for (d, path) in built {
434                memo.paths[d as usize] = Some(path);
435            }
436        }
437        memo
438    }
439
440    #[inline]
441    fn parent_prefix(&self, parent: EntryId) -> &[u8] {
442        self.paths
443            .get(parent as usize)
444            .and_then(|p| p.as_deref())
445            .unwrap_or(&[])
446    }
447
448    fn bytes(&self) -> u64 {
449        let slots = self.paths.capacity() * std::mem::size_of::<Option<Box<[u8]>>>();
450        let boxed: usize = self
451            .paths
452            .iter()
453            .filter_map(|p| p.as_ref().map(|b| b.len()))
454            .sum();
455        (slots + boxed) as u64
456    }
457}
458
459/// The path memos one query execution may read. `None` means the compiled
460/// query proved it never reads that pool — most path queries are folded
461/// and skip the original-name memo entirely.
462#[derive(Default)]
463pub(super) struct PathMemos {
464    pub(super) lower: Option<std::sync::Arc<DirPathsLower>>,
465    pub(super) orig: Option<std::sync::Arc<DirPathsOrig>>,
466}
467
468impl PathMemos {
469    #[inline]
470    pub(super) fn lower_prefix(&self, parent: EntryId) -> &[u8] {
471        self.lower
472            .as_ref()
473            .map_or(&[][..], |m| m.0.parent_prefix(parent))
474    }
475
476    #[inline]
477    pub(super) fn orig_prefix(&self, parent: EntryId) -> &[u8] {
478        self.orig
479            .as_ref()
480            .map_or(&[][..], |m| m.0.parent_prefix(parent))
481    }
482}
483
484/// Build the pool offset table ahead of the first query — the engine calls
485/// this once a volume turns Ready so no keystroke pays the cold cost.
486///
487/// The dir-path memo is *not* prewarmed: most sessions never issue a path
488/// query, and its footprint (full paths of every directory, ×2 pools) is
489/// real RAM. The first keystroke containing `\` or `path:` builds it once;
490/// after a USN batch it was already rebuilt lazily, so nothing regresses.
491pub fn prewarm(idx: &VolumeIndex) {
492    let _: std::sync::Arc<OffsetTable> = idx.cached_derived(|| OffsetTable::build(idx));
493}
494
495/// Bytes currently held by this index's derived caches (offset table +
496/// dir-path memos), for the RAM accounting in `IndexStats`. Probes only —
497/// never builds.
498pub fn derived_cache_bytes(idx: &VolumeIndex) -> u64 {
499    let mut total = 0u64;
500    if let Some(t) = idx.derived_probe::<OffsetTable>() {
501        total += (t.offs.capacity() * 4 + t.ids.capacity() * 4) as u64;
502    }
503    if let Some(d) = idx.derived_probe::<DirPathsLower>() {
504        total += d.0.bytes();
505    }
506    if let Some(d) = idx.derived_probe::<DirPathsOrig>() {
507        total += d.0.bytes();
508    }
509    if let Some(p) = idx.derived_probe::<SizePerm>() {
510        total += (p.0.ids.capacity() * 4) as u64;
511    }
512    if let Some(p) = idx.derived_probe::<MtimePerm>() {
513        total += (p.0.ids.capacity() * 4) as u64;
514    }
515    total
516}
517
518#[cfg(test)]
519mod tests {
520    use std::sync::Arc;
521
522    use super::*;
523    use crate::index::VolumeIndexBuilder;
524    use crate::index::testutil::{build_sample, raw, u16s};
525
526    /// 60-deep dir chain (well inside both the memo's 4096 and
527    /// `append_parent_path`'s 128 depth bounds) plus a multibyte dir and files.
528    fn deep_index() -> VolumeIndex {
529        let mut b = VolumeIndexBuilder::new("C:", 5);
530        for i in 0..60u64 {
531            let name = u16s(&format!("d{i:02}"));
532            let parent = if i == 0 { 5 } else { 99 + i };
533            b.push(raw(100 + i, parent, &name, true, 0, i as i64));
534        }
535        let jp = u16s("日本語フォルダ");
536        b.push(raw(300, 110, &jp, true, 0, 1)); // under d10
537        let note = u16s("Note.TXT");
538        b.push(raw(301, 300, &note, false, 9, 2));
539        let leaf = u16s("leaf.txt");
540        b.push(raw(302, 159, &leaf, false, 1, 3)); // under d59
541        b.finish()
542    }
543
544    /// Oracle: full path of `id` incl. trailing `\`, built from the parent
545    /// chain exactly like `VolumeIndex::append_path` does.
546    fn oracle_paths(idx: &VolumeIndex, id: EntryId) -> (Vec<u8>, Vec<u8>) {
547        let mut chain = vec![id];
548        let mut cur = id;
549        while cur != VolumeIndex::ROOT {
550            cur = idx.parent(cur);
551            chain.push(cur);
552        }
553        let (mut lower, mut orig) = (Vec::new(), Vec::new());
554        for &c in chain.iter().rev() {
555            lower.extend_from_slice(idx.lower_name(c));
556            lower.push(b'\\');
557            orig.extend_from_slice(idx.name(c));
558            orig.push(b'\\');
559        }
560        (lower, orig)
561    }
562
563    fn assert_memo_matches_oracle(idx: &VolumeIndex) {
564        let lower_memo = DirPathsLower::build(idx);
565        let orig_memo = DirPathsOrig::build(idx);
566        for id in 0..idx.len() as u32 {
567            if idx.is_dir(id) {
568                let (lower, orig) = oracle_paths(idx, id);
569                assert_eq!(
570                    lower_memo.0.paths[id as usize].as_deref(),
571                    Some(lower.as_slice()),
572                    "lower path of dir {id}"
573                );
574                assert_eq!(
575                    orig_memo.0.paths[id as usize].as_deref(),
576                    Some(orig.as_slice()),
577                    "orig path of dir {id}"
578                );
579                // And the oracle itself agrees with the core path builder.
580                // (append_path skips the root's own name — the volume label
581                // is rendered by callers via name(); see engine/results.rs —
582                // so the cross-check only applies below the root.)
583                if id != VolumeIndex::ROOT {
584                    let mut ap = Vec::new();
585                    idx.append_path(id, &mut ap);
586                    ap.push(b'\\');
587                    assert_eq!(orig, ap, "append_path oracle of dir {id}");
588                }
589            } else {
590                assert!(lower_memo.0.paths[id as usize].is_none());
591                assert!(orig_memo.0.paths[id as usize].is_none());
592            }
593        }
594    }
595
596    #[test]
597    fn dir_paths_match_append_path_oracle() {
598        assert_memo_matches_oracle(&deep_index());
599    }
600
601    /// Oracle: an incrementally extended dir-path memo must equal a fresh
602    /// build — across appended dirs (extend fast path) and dir renames /
603    /// moves (topology bump → internal full rebuild). Both pools, since
604    /// they extend independently in their own cache slots.
605    #[test]
606    fn extended_dir_paths_match_fresh_build() {
607        let assert_same_as_fresh =
608            |idx: &VolumeIndex, lower: &DirPathsLower, orig: &DirPathsOrig, what: &str| {
609                let fresh_lower = DirPathsLower::build(idx);
610                let fresh_orig = DirPathsOrig::build(idx);
611                for id in 0..idx.len() {
612                    assert_eq!(
613                        lower.0.paths[id], fresh_lower.0.paths[id],
614                        "{what}: lower of {id}"
615                    );
616                    assert_eq!(
617                        orig.0.paths[id], fresh_orig.0.paths[id],
618                        "{what}: orig of {id}"
619                    );
620                }
621            };
622
623        let mut idx = deep_index();
624        let lower = DirPathsLower::build(&idx);
625        let orig = DirPathsOrig::build(&idx);
626
627        // Gen step 1: append a new dir under an existing one, a file in it,
628        // and a nested dir under the *new* dir (parent inside the batch).
629        let first_new = idx.len() as u32;
630        let new_dir = u16s("new_dir");
631        idx.upsert(&raw(500, 110, &new_dir, true, 0, 1));
632        let new_file = u16s("new_file.txt");
633        idx.upsert(&raw(501, 500, &new_file, false, 1, 2));
634        let nested = u16s("nested");
635        idx.upsert(&raw(502, 500, &nested, true, 0, 3));
636        idx.merge_new_into_permutations(first_new);
637        let lower = DirPathsLower::extend_from(&idx, Arc::new(lower));
638        let orig = DirPathsOrig::extend_from(&idx, Arc::new(orig));
639        assert_same_as_fresh(&idx, &lower, &orig, "append generation");
640
641        // Gen step 2: in-place dir rename — topology bump, extend must
642        // rebuild and descendants must reflect the new name.
643        let renamed = u16s("renamed_mid");
644        idx.rename_dir_in_place(110, &renamed, 109).unwrap();
645        idx.merge_new_into_permutations(idx.len() as u32);
646        let lower = DirPathsLower::extend_from(&idx, Arc::new(lower));
647        let orig = DirPathsOrig::extend_from(&idx, Arc::new(orig));
648        assert_same_as_fresh(&idx, &lower, &orig, "rename generation");
649
650        // Gen step 3: dir move (reparent) — also a topology bump.
651        idx.reparent(500, 100).unwrap();
652        idx.merge_new_into_permutations(idx.len() as u32);
653        let lower = DirPathsLower::extend_from(&idx, Arc::new(lower));
654        let orig = DirPathsOrig::extend_from(&idx, Arc::new(orig));
655        assert_same_as_fresh(&idx, &lower, &orig, "reparent generation");
656
657        // File-only batches keep the fast path: same topology generation.
658        let first_new = idx.len() as u32;
659        let f2 = u16s("plain.txt");
660        idx.upsert(&raw(503, 100, &f2, false, 1, 4));
661        idx.merge_new_into_permutations(first_new);
662        let topo_before = idx.dir_topology_generation();
663        let lower = DirPathsLower::extend_from(&idx, Arc::new(lower));
664        let orig = DirPathsOrig::extend_from(&idx, Arc::new(orig));
665        assert_eq!(idx.dir_topology_generation(), topo_before);
666        assert_same_as_fresh(&idx, &lower, &orig, "file-only generation");
667    }
668
669    #[test]
670    fn dir_paths_follow_dir_rename_and_reparent() {
671        let mut idx = deep_index();
672        // In-place rename of a mid-chain dir: every descendant path shifts.
673        let renamed = u16s("Renamed_D10");
674        idx.rename_dir_in_place(110, &renamed, 109).unwrap();
675        idx.merge_new_into_permutations(idx.len() as u32);
676        assert_memo_matches_oracle(&idx);
677        // Move a subtree (d30 under d02): depths change levels.
678        idx.reparent(130, 102).unwrap();
679        idx.merge_new_into_permutations(idx.len() as u32);
680        assert_memo_matches_oracle(&idx);
681    }
682
683    #[test]
684    fn offset_table_reflects_non_monotonic_name_off_after_dir_rename() {
685        let mut idx = build_sample();
686        let docs = idx.entry_by_record(50).unwrap();
687        let zzz = u16s("zzz_docs");
688        idx.rename_dir_in_place(50, &zzz, 5).unwrap();
689        idx.merge_new_into_permutations(idx.len() as u32);
690
691        // Precondition: the rename made raw name_off non-monotonic.
692        let raw_offs: Vec<u32> = (0..idx.len() as u32)
693            .map(|id| idx.name_off_of(id))
694            .collect();
695        assert!(!raw_offs.is_sorted());
696
697        let table = OffsetTable::build(&idx);
698        assert_eq!(table.len(), idx.len());
699        assert!(table.offs.is_sorted());
700        let mut ids = table.ids.clone();
701        ids.sort_unstable();
702        assert_eq!(ids, (0..idx.len() as u32).collect::<Vec<_>>());
703        for (off, &id) in table.offs.iter().zip(&table.ids) {
704            assert_eq!(*off, idx.name_off_of(id), "table pair for entry {id}");
705        }
706        // The renamed dir's name was appended, so it sorts to the end.
707        assert_eq!(*table.ids.last().unwrap(), docs);
708    }
709
710    #[test]
711    fn cached_derived_invalidates_on_content_generation_change() {
712        let mut idx = build_sample();
713        let d1 = idx.cached_derived(|| DirPathsLower::build(&idx));
714        let d2: Arc<DirPathsLower> = idx.cached_derived(|| unreachable!("cache hit expected"));
715        assert!(Arc::ptr_eq(&d1, &d2));
716        // A second cached type joins the generation without evicting the first.
717        let t1 = idx.cached_derived(|| OffsetTable::build(&idx));
718        let d3: Arc<DirPathsLower> = idx.cached_derived(|| unreachable!("cache hit expected"));
719        assert!(Arc::ptr_eq(&d1, &d3));
720
721        idx.merge_new_into_permutations(idx.len() as u32); // no-op batch: gen +1
722        let d4 = idx.cached_derived(|| DirPathsLower::build(&idx));
723        assert!(
724            !Arc::ptr_eq(&d1, &d4),
725            "DirPathsLower must rebuild on a new generation"
726        );
727        let t2 = idx.cached_derived(|| OffsetTable::build(&idx));
728        assert!(
729            !Arc::ptr_eq(&t1, &t2),
730            "OffsetTable must rebuild on a new generation"
731        );
732    }
733
734    /// Oracle: an incrementally extended table must yield exactly the same
735    /// sweep results as a freshly built one, across a chain of generations
736    /// with appends, file renames (tombstone+append) and in-place dir
737    /// renames. The tables themselves differ (stale pairs) — the *results*
738    /// must not.
739    #[test]
740    fn extended_table_sweeps_like_a_fresh_build() {
741        use super::super::compile::Driver;
742        use super::super::sweep::driver_candidates;
743        use memchr::memmem;
744
745        let assert_equiv = |idx: &VolumeIndex, table: &OffsetTable, what: &str| {
746            let fresh = OffsetTable::build(idx);
747            let drivers = [
748                Driver::Sub {
749                    finder: memmem::Finder::new(b"doc").into_owned(),
750                    needle_len: 3,
751                },
752                Driver::Sub {
753                    finder: memmem::Finder::new(b"renamed").into_owned(),
754                    needle_len: 7,
755                },
756                Driver::Prefix {
757                    bytes: b"note".to_vec(),
758                },
759                Driver::Suffixes {
760                    suffixes: vec![b".txt".to_vec()],
761                    files_only: true,
762                },
763            ];
764            for (i, d) in drivers.iter().enumerate() {
765                let mut a = driver_candidates(idx, table, d, true);
766                let mut b = driver_candidates(idx, &fresh, d, true);
767                a.sort_unstable();
768                b.sort_unstable();
769                assert_eq!(a, b, "{what}: driver {i} diverged from fresh build");
770            }
771        };
772
773        // ~100 entries so the stale threshold (n/8) tolerates a few in-place
774        // renames instead of flipping straight to a rebuild.
775        let mut b = VolumeIndexBuilder::new("C:", 5);
776        let dir = u16s("docs_dir");
777        b.push(raw(50, 5, &dir, true, 0, 1));
778        for i in 0..100u64 {
779            let name = if i % 3 == 0 {
780                u16s(&format!("note_{i:03}_doc.txt"))
781            } else {
782                u16s(&format!("file_{i:03}.log"))
783            };
784            let parent = if i % 2 == 0 { 50 } else { 5 };
785            b.push(raw(100 + i, parent, &name, false, i, i as i64));
786        }
787        let mut idx = b.finish();
788        let mut table = OffsetTable::build(&idx);
789
790        // Gen step 1: append a new file + rename an existing file
791        // (tombstone + append, same record).
792        let first_new = idx.len() as u32;
793        let extra = u16s("docs_extra.txt");
794        idx.upsert(&raw(9000, 5, &extra, false, 1, 7));
795        let renamed_file = u16s("note_renamed.txt");
796        idx.upsert(&raw(103, 50, &renamed_file, false, 9, 8));
797        idx.merge_new_into_permutations(first_new);
798        table = OffsetTable::extend_from(&idx, Arc::new(table));
799        assert_eq!(table.stale_pairs, 0, "appends produce no stale pairs");
800        assert_equiv(&idx, &table, "append generation");
801
802        // Gen step 2: in-place dir rename — the moved name strands a stale
803        // pair at its old offset.
804        let zzz = u16s("zzz_docs_renamed");
805        idx.rename_dir_in_place(50, &zzz, 5).unwrap();
806        idx.merge_new_into_permutations(idx.len() as u32);
807        table = OffsetTable::extend_from(&idx, Arc::new(table));
808        assert_eq!(table.stale_pairs, 1);
809        assert!(table.offs.is_sorted());
810        assert_equiv(&idx, &table, "dir-rename generation");
811
812        // Gen step 3: both at once.
813        let first_new = idx.len() as u32;
814        let more = u16s("more_docs.log");
815        idx.upsert(&raw(9001, 50, &more, false, 2, 9));
816        idx.merge_new_into_permutations(first_new);
817        let back = u16s("docs_back");
818        idx.rename_dir_in_place(50, &back, 5).unwrap();
819        idx.merge_new_into_permutations(idx.len() as u32);
820        table = OffsetTable::extend_from(&idx, Arc::new(table));
821        assert_eq!(table.stale_pairs, 2);
822        assert_equiv(&idx, &table, "mixed generation");
823    }
824
825    /// A high stale fraction flips the policy to a clean rebuild — same
826    /// results, watermark reset.
827    #[test]
828    fn extend_falls_back_to_rebuild_past_stale_threshold() {
829        let mut idx = build_sample(); // 4 entries → threshold n/8 = 0
830        let table = OffsetTable::build(&idx);
831        let renamed = u16s("docs_renamed");
832        idx.rename_dir_in_place(50, &renamed, 5).unwrap();
833        idx.merge_new_into_permutations(idx.len() as u32);
834
835        let extended = OffsetTable::extend_from(&idx, Arc::new(table));
836        assert_eq!(
837            extended.stale_pairs, 0,
838            "1 stale > 4/8 entries — must have rebuilt clean"
839        );
840        assert_eq!(extended.len(), idx.len());
841        for (off, &id) in extended.offs.iter().zip(&extended.ids) {
842            assert_eq!(*off, idx.name_off_of(id));
843        }
844    }
845
846    /// Oracle: an incrementally extended lazy sort permutation equals a
847    /// fresh parallel sort byte-for-byte across append/delete generations
848    /// (strict total order → the sorted result is unique).
849    #[test]
850    fn lazy_sort_perms_extend_like_fresh_builds() {
851        let mut idx = build_sample();
852        let mut size_perm = SortPerm::build(&idx, SortKey::Size);
853        let mut mtime_perm = SortPerm::build(&idx, SortKey::Mtime);
854        for g in 0..20u64 {
855            let first_new = idx.len() as u32;
856            let record = 200 + g;
857            // Mix sizes across the u32 overflow boundary.
858            let size = if g % 4 == 0 { (4u64 << 30) + g } else { g * 37 };
859            let name = u16s(&format!("lazy_{g}.bin"));
860            idx.upsert(&raw(record, 50, &name, false, size, (g * 13) as i64));
861            if g % 3 == 0 {
862                idx.delete(200 + g / 2);
863            }
864            idx.merge_new_into_permutations(first_new);
865
866            size_perm = SortPerm::extend(&idx, size_perm, SortKey::Size);
867            mtime_perm = SortPerm::extend(&idx, mtime_perm, SortKey::Mtime);
868            assert_eq!(
869                size_perm.ids,
870                SortPerm::build(&idx, SortKey::Size).ids,
871                "size order diverged at generation {g}"
872            );
873            assert_eq!(
874                mtime_perm.ids,
875                SortPerm::build(&idx, SortKey::Mtime).ids,
876                "mtime order diverged at generation {g}"
877            );
878        }
879    }
880
881    /// The cached lazy permutation survives stat updates as a complete
882    /// permutation (stale positions are pinned behavior), and `get`
883    /// caches within a generation / extends across one.
884    #[test]
885    fn size_perm_get_caches_and_stays_complete_under_stat_updates() {
886        let mut idx = build_sample();
887        let p1 = SizePerm::get(&idx);
888        let p2 = SizePerm::get(&idx);
889        assert!(Arc::ptr_eq(&p1, &p2), "same generation must cache-hit");
890        drop((p1, p2));
891
892        idx.update_stat(100, 999_999, 1).unwrap();
893        idx.merge_new_into_permutations(idx.len() as u32);
894        let p3 = SizePerm::get(&idx);
895        let mut seen: Vec<u32> = p3.0.ids.clone();
896        seen.sort_unstable();
897        assert_eq!(seen, (0..idx.len() as u32).collect::<Vec<_>>());
898    }
899
900    #[test]
901    fn prewarm_builds_offset_table_but_leaves_dir_paths_lazy() {
902        let idx = build_sample();
903        prewarm(&idx);
904        let _: Arc<OffsetTable> = idx.cached_derived(|| unreachable!("OffsetTable not prewarmed"));
905        // The dir-path memos only materialize on the first path query that
906        // reads them — and independently of each other.
907        assert!(idx.derived_probe::<DirPathsLower>().is_none());
908        assert!(idx.derived_probe::<DirPathsOrig>().is_none());
909        assert!(derived_cache_bytes(&idx) > 0, "offset table accounted");
910        let before = derived_cache_bytes(&idx);
911        let _ = idx.cached_derived(|| DirPathsLower::build(&idx));
912        let with_lower = derived_cache_bytes(&idx);
913        assert!(
914            with_lower > before,
915            "lower memo joins the accounting once built"
916        );
917        assert!(
918            idx.derived_probe::<DirPathsOrig>().is_none(),
919            "building the folded memo must not drag the orig memo in"
920        );
921        let _ = idx.cached_derived(|| DirPathsOrig::build(&idx));
922        assert!(
923            derived_cache_bytes(&idx) > with_lower,
924            "orig memo accounted separately"
925        );
926    }
927
928    #[test]
929    fn parent_cycle_attaches_dirs_at_root_instead_of_aborting() {
930        // Corrupt USN records can produce a parent cycle (a→b→a). Cycle
931        // members must come out root-attached, with paths intact — not
932        // abort via a u32::MAX depth poisoning the level-table size.
933        let mut b = VolumeIndexBuilder::new("C:", 5);
934        let (da, db, f) = (u16s("a"), u16s("b"), u16s("f.txt"));
935        b.push(raw(10, 5, &da, true, 0, 1));
936        b.push(raw(20, 10, &db, true, 0, 2));
937        b.push(raw(30, 20, &f, false, 1, 3));
938        let mut idx = b.finish();
939        idx.reparent(10, 20); // a under b while b is under a — cycle
940        let a = idx.entry_by_record(10).unwrap();
941        let bb = idx.entry_by_record(20).unwrap();
942
943        let memo = DirPathsLower::build(&idx);
944        for d in [a, bb] {
945            let lower = memo.0.paths[d as usize]
946                .as_deref()
947                .expect("cycle members still get a path");
948            assert!(lower.ends_with(b"\\"));
949        }
950    }
951}