Skip to main content

fmf_core\engine/
worker.rs

1//! The volume worker: the thread that drives one volume through
2//! restore-or-scan → Ready → USN tailing → (journal-gone) rescan, forever.
3//! `volume.rs` is the state's home (`VolumeSlot`, checkpoint, save helper);
4//! this file is the flow's home. Decisions are pure functions; effects
5//! (counters, logs, events, installs, saves) stay in the loop, keyed off
6//! the decisions — that split is what lets `worker_tests.rs` replay the
7//! failure paths deterministically without elevation (ADR-0018, S4b).
8//!
9//! The full $MFT scan itself is deliberately *not* behind a seam (the
10//! 2-trait cap): its execution stays real-volume territory, covered by the
11//! `FMF_ADMIN_TESTS` layer.
12
13use std::sync::Arc;
14use std::sync::atomic::Ordering;
15use std::time::{Duration, Instant};
16
17use crate::metrics::{Counters, ScanTrace, UsnTrace};
18use crate::usn::{JournalGone, ReadOutcome, UsnError, UsnRecord, apply_batch};
19
20use super::seams::{JournalSource, JournalView, WinJournalSource};
21use super::volume::{JournalCheckpoint, VolumeSlot, WorkerKind};
22use super::watch::WatcherJournalSource;
23use super::{Engine, EngineEvent, VolumeState};
24
25/// Engine-side debounce for `IndexChanged` — the only throttle in the whole
26/// change path (docs/ARCHITECTURE.md latency budget).
27const INDEX_CHANGED_DEBOUNCE: Duration = Duration::from_millis(200);
28
29/// How the worker establishes a volume's index at the top of its loop.
30#[derive(Clone, Copy, Debug, PartialEq, Eq)]
31pub(super) enum SnapshotDecision {
32    /// Install the loaded snapshot and replay the journal from its
33    /// persisted cursor.
34    Restore,
35    /// Build the index from a full $MFT scan.
36    FullScan(FullScanReason),
37}
38
39/// Why a full scan was chosen — selects the effect at the call site
40/// (counter + warn / info / silence). Effects stay out of the decision.
41#[derive(Clone, Copy, Debug, PartialEq, Eq)]
42pub(super) enum FullScanReason {
43    /// No snapshot on disk: the normal first run, not a failure.
44    FirstRun,
45    /// Snapshot present but unreadable/corrupt — counted in
46    /// `snapshot_load_failures`, never silent.
47    SnapshotUnusable,
48    /// Snapshot loaded, but its checkpoint cannot be replayed from the
49    /// live journal: the journal id changed, or the persisted cursor was
50    /// already purged (`next_usn < first_usn`).
51    CheckpointStale,
52    /// `FSCTL_QUERY_USN_JOURNAL` failed, so the checkpoint cannot be
53    /// validated and must not be trusted. Silent by design (pre-seam
54    /// behavior): a journal this broken fails the next open/read loudly.
55    JournalQueryFailed,
56}
57
58/// Pure decision: snapshot-load outcome × live-journal view → restore or
59/// full scan. The load result carries only the persisted checkpoint; the
60/// index itself never enters the decision.
61///
62/// `journal` is `None` when `FSCTL_QUERY` failed *or* was skipped because
63/// the load already failed (the worker never queries in that case — a
64/// failed load must not spend an FSCTL). The load arm is matched first,
65/// so the two `None` meanings cannot mix.
66pub(super) fn snapshot_decision(
67    load: Result<(u64, i64), &std::io::Error>,
68    journal: Option<JournalView>,
69) -> SnapshotDecision {
70    match load {
71        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
72            SnapshotDecision::FullScan(FullScanReason::FirstRun)
73        }
74        Err(_) => SnapshotDecision::FullScan(FullScanReason::SnapshotUnusable),
75        Ok((journal_id, next_usn)) => match journal {
76            Some(view) if journal_id == view.journal_id && next_usn >= view.first_usn => {
77                SnapshotDecision::Restore
78            }
79            Some(_) => SnapshotDecision::FullScan(FullScanReason::CheckpointStale),
80            None => SnapshotDecision::FullScan(FullScanReason::JournalQueryFailed),
81        },
82    }
83}
84
85/// What one blocking-read outcome means for the tail loop.
86pub(super) enum TailStep {
87    /// Apply the records to the index, then publish the new checkpoint —
88    /// in that order (see the checkpoint-after-apply invariant at the
89    /// apply site).
90    Apply {
91        records: Vec<UsnRecord>,
92        truncated: bool,
93    },
94    /// The journal id is dead. Recovery is always a full rescan
95    /// (docs/RESEARCH.md standard practice): invalidate the shared checkpoint, drop the
96    /// snapshot, announce Rescanning, restart the outer loop.
97    Rescan(JournalGone),
98    /// Unrecoverable read error — the volume goes Failed.
99    Fail(UsnError),
100}
101
102/// Pure decision: classify one blocking-read outcome into the worker's
103/// next step. Every `JournalGone` variant maps to a rescan — none is
104/// recoverable in place — and an FSCTL error is fatal for the volume.
105pub(super) fn journal_gone_action(outcome: Result<ReadOutcome, UsnError>) -> TailStep {
106    match outcome {
107        Ok(ReadOutcome::Records { records, truncated }) => TailStep::Apply { records, truncated },
108        Ok(ReadOutcome::Gone(gone)) => TailStep::Rescan(gone),
109        Err(e) => TailStep::Fail(e),
110    }
111}
112
113/// Outcome of the compaction generation recheck (pure half of
114/// `maybe_compact`).
115#[derive(Clone, Copy, Debug, PartialEq, Eq)]
116pub(super) enum CompactionVerdict {
117    Install,
118    Abort,
119}
120
121/// Pure decision: the compacted copy was built under a read guard at
122/// content generation `copied_at`; installing it is only sound if nothing
123/// advanced the generation in between. Single-writer invariant: this
124/// volume thread is the index's only writer, so `Abort` means that
125/// invariant broke somewhere and installing would silently lose the
126/// in-between mutations — the copy is discarded loudly instead.
127pub(super) fn compact_recheck(copied_at: u64, current: Option<u64>) -> CompactionVerdict {
128    if current == Some(copied_at) {
129        CompactionVerdict::Install
130    } else {
131        CompactionVerdict::Abort
132    }
133}
134
135impl Engine {
136    /// Production wiring: the Windows journal seam (the snapshot seam
137    /// lives on the slot, created by `index_start`).
138    #[cfg(windows)]
139    pub(super) fn volume_thread(self: Arc<Self>, slot: Arc<VolumeSlot>) {
140        // Pick the change-source seam by slot kind (ADR-0024). Clone the roots
141        // out of the borrow first so `slot` is free to move into the call.
142        let walk_roots = match &slot.kind {
143            WorkerKind::Mft => None,
144            WorkerKind::Walk { roots, .. } => Some(roots.clone()),
145        };
146        if let Some(roots) = walk_roots {
147            let mut journal = WatcherJournalSource::new(roots);
148            self.volume_thread_with(slot, &mut journal);
149        } else {
150            let mut journal = WinJournalSource::new(slot.label.clone());
151            self.volume_thread_with(slot, &mut journal);
152        }
153    }
154
155    /// Panic firewall: a crashing volume thread must never leave the UI
156    /// stuck on "Scanning" with no explanation. The panic itself is logged
157    /// (with backtrace) by the diag hook; this converts it into a visible
158    /// Failed state. Worker entry with an injectable journal seam —
159    /// production passes the Win implementation; `worker_tests.rs` passes
160    /// scripted fakes.
161    #[cfg(windows)]
162    pub(super) fn volume_thread_with(
163        self: Arc<Self>,
164        slot: Arc<VolumeSlot>,
165        journal: &mut dyn JournalSource,
166    ) {
167        let this = self.clone();
168        let slot2 = slot.clone();
169        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
170            this.volume_thread_inner(slot2, journal);
171        }));
172        if result.is_err() {
173            *slot.phase.lock() = VolumeState::Failed;
174            self.emit(EngineEvent::VolumeFailed {
175                volume: slot.label.clone(),
176                message: "internal panic — engine.log に詳細".to_string(),
177            });
178        }
179    }
180
181    #[cfg(windows)]
182    fn volume_thread_inner(
183        self: Arc<Self>,
184        slot: Arc<VolumeSlot>,
185        journal: &mut dyn JournalSource,
186    ) {
187        let label = slot.label.clone();
188        let store = slot.store.clone();
189
190        loop {
191            if slot.stop.load(Ordering::Relaxed) {
192                return;
193            }
194            // 1. Journal first (checkpoint precedes the scan so nothing is
195            //    missed), then snapshot-or-scan.
196            if let Err(e) = journal.open() {
197                *slot.phase.lock() = VolumeState::Failed;
198                self.emit(EngineEvent::VolumeFailed {
199                    volume: label,
200                    message: e.to_string(),
201                });
202                return;
203            }
204
205            let load_stage = std::time::Instant::now();
206            let load = store.load();
207            let journal_view = if load.is_ok() {
208                journal.query().ok()
209            } else {
210                None
211            };
212            let decision = snapshot_decision(
213                load.as_ref()
214                    .map(|(_, journal_id, next_usn)| (*journal_id, *next_usn)),
215                journal_view,
216            );
217
218            let idx = match decision {
219                SnapshotDecision::Restore => {
220                    let (idx, _journal_id, next_usn) =
221                        load.expect("Restore implies a loaded snapshot");
222                    journal.set_next_usn(next_usn);
223                    let load_ms = load_stage.elapsed().as_millis() as u64;
224                    tracing::info!(volume = %label, entries = idx.len(), ms = load_ms, "snapshot restored");
225                    let file_bytes = store.file_bytes();
226                    self.metrics.record_scan(ScanTrace {
227                        volume: label.clone(),
228                        source: "snapshot".to_string(),
229                        read_bytes: file_bytes,
230                        read_ms: 0,
231                        mb_per_s: 0.0,
232                        parse_ms: 0,
233                        deferred_ms: 0,
234                        build_ms: 0,
235                        sort_ms: 0,
236                        total_ms: load_ms,
237                        entries: idx.len() as u64,
238                        peak_ws_bytes: crate::mft::peak_working_set(),
239                    });
240                    idx
241                }
242                SnapshotDecision::FullScan(reason) => {
243                    match reason {
244                        FullScanReason::SnapshotUnusable => {
245                            // Corrupt/unreadable snapshot: rescanning
246                            // recovers, but the fact must not vanish.
247                            if let Err(e) = &load {
248                                Counters::bump(&self.metrics.counters.snapshot_load_failures);
249                                tracing::warn!(volume = %label, error = %e, "snapshot unusable — full scan");
250                            }
251                        }
252                        FullScanReason::CheckpointStale => {
253                            tracing::info!(volume = %label, "snapshot checkpoint stale — full scan");
254                        }
255                        FullScanReason::FirstRun | FullScanReason::JournalQueryFailed => {}
256                    }
257                    // A rejected snapshot must not stay resident while the
258                    // scan (and the tail session after it) runs.
259                    drop(load);
260                    // Initial-scan source by slot kind (ADR-0024): the $MFT
261                    // stream (elevated) or the non-elevated folder walk. Both
262                    // yield (VolumeIndex, ScanStats); the walk is infallible.
263                    let scanned = match &slot.kind {
264                        WorkerKind::Mft => crate::mft::scan_volume(&label),
265                        WorkerKind::Walk { roots, excludes } => {
266                            Ok(crate::scan::walk::walk_scan(roots, excludes))
267                        }
268                    };
269                    match scanned {
270                        Ok((mut idx, stats)) => {
271                            let scan_source = match &slot.kind {
272                                WorkerKind::Walk { .. } => "walk",
273                                WorkerKind::Mft => "scan",
274                            };
275                            tracing::info!(
276                                volume = %label,
277                                entries = idx.len(),
278                                ms = stats.elapsed_total_ms,
279                                // Normal scope-mode pruning (ADR-0025), 0 elsewhere;
280                                // surfaced here rather than as a degrade counter.
281                                excluded_pruned = stats.walk_excluded_pruned,
282                                "full scan complete"
283                            );
284                            idx.shrink_to_fit();
285                            Counters::add(
286                                &self.metrics.counters.corrupt_mft_records,
287                                stats.corrupt_records,
288                            );
289                            Counters::add(
290                                &self.metrics.counters.deferred_names_unresolved,
291                                stats.deferred_unresolved,
292                            );
293                            Counters::add(
294                                &self.metrics.counters.scan_pipeline_fallbacks,
295                                stats.pipeline_fallbacks,
296                            );
297                            // Single stats→counters mapping point: the scan
298                            // internals only return degradations in ScanStats,
299                            // never warn. A count add is needed, so instead of
300                            // degrade! (bump=+1 only) the add and warn sit on two
301                            // adjacent explicit lines, done indivisibly.
302                            if stats.ext_name_cache_skipped > 0 {
303                                Counters::add(
304                                    &self.metrics.counters.deferred_name_cache_overflow,
305                                    stats.ext_name_cache_skipped,
306                                );
307                                tracing::warn!(
308                                    volume = %label,
309                                    skipped = stats.ext_name_cache_skipped,
310                                    "extension-record name cache full — remainder resolved via disk reads"
311                                );
312                            }
313                            if stats.deferred_name_read_failures > 0 {
314                                Counters::add(
315                                    &self.metrics.counters.deferred_name_read_failures,
316                                    stats.deferred_name_read_failures,
317                                );
318                                tracing::warn!(
319                                    volume = %label,
320                                    failures = stats.deferred_name_read_failures,
321                                    "deferred-name disk reads failed — those names stay unresolved until rescan"
322                                );
323                            }
324                            // Scope-walk degradations (ADR-0024): same single
325                            // stats→counters+warn mapping point as the $MFT
326                            // ones above; zero for the privileged path.
327                            if stats.walk_read_errors > 0 {
328                                Counters::add(
329                                    &self.metrics.counters.walk_read_errors,
330                                    stats.walk_read_errors,
331                                );
332                                tracing::warn!(
333                                    volume = %label,
334                                    errors = stats.walk_read_errors,
335                                    "scope walk: paths skipped (unreadable) — absent until re-index"
336                                );
337                            }
338                            if stats.walk_depth_truncated > 0 {
339                                Counters::add(
340                                    &self.metrics.counters.walk_depth_truncated,
341                                    stats.walk_depth_truncated,
342                                );
343                                tracing::warn!(
344                                    volume = %label,
345                                    truncated = stats.walk_depth_truncated,
346                                    "scope walk: subtrees not descended (depth cap)"
347                                );
348                            }
349                            self.metrics.record_scan(ScanTrace {
350                                volume: label.clone(),
351                                source: scan_source.to_string(),
352                                read_bytes: stats.mft_bytes,
353                                read_ms: stats.elapsed_mft_load_ms,
354                                mb_per_s: if stats.elapsed_mft_load_ms > 0 {
355                                    stats.mft_bytes as f64
356                                        / 1_048.576
357                                        / stats.elapsed_mft_load_ms as f64
358                                } else {
359                                    0.0
360                                },
361                                parse_ms: stats.elapsed_parse_ms,
362                                deferred_ms: stats.elapsed_deferred_ms,
363                                build_ms: stats.elapsed_build_ms,
364                                sort_ms: stats.elapsed_sort_ms,
365                                total_ms: stats.elapsed_total_ms,
366                                entries: idx.len() as u64,
367                                peak_ws_bytes: stats.peak_working_set_bytes,
368                            });
369                            idx
370                        }
371                        Err(e) => {
372                            *slot.phase.lock() = VolumeState::Failed;
373                            self.emit(EngineEvent::VolumeFailed {
374                                volume: label,
375                                message: e.to_string(),
376                            });
377                            return;
378                        }
379                    }
380                }
381            };
382
383            let entries = idx.live_len() as u64;
384            *slot.scanned.lock() = entries;
385            slot.install_index(idx);
386            // Scan path: next_usn is the position at journal open (before the
387            // scan), so a flush taken now replays the scan window — correct,
388            // just slightly redundant.
389            *slot.checkpoint.lock() = Some(JournalCheckpoint {
390                journal_id: journal.journal_id(),
391                next_usn: journal.next_usn(),
392            });
393            *slot.phase.lock() = VolumeState::Ready;
394            self.emit(EngineEvent::VolumeReady {
395                volume: label.clone(),
396                entries,
397            });
398            // Prewarm the query accelerators (dir-path memo, offset table)
399            // so the first keystroke never pays the cold-cache cost.
400            if let Some(idx) = slot.index.read().as_ref() {
401                crate::query::prewarm(idx);
402            }
403
404            // 2. Tail the journal until stop or journal-gone.
405            let fetch = match journal.open_stat_fetcher() {
406                Ok(f) => f,
407                Err(e) => {
408                    self.emit(EngineEvent::VolumeFailed {
409                        volume: label,
410                        message: e.to_string(),
411                    });
412                    return;
413                }
414            };
415            let mut buf = Vec::new();
416            // None = "never emitted" → the first change emits immediately. Avoids
417            // `Instant - DEBOUNCE` and `checked_sub(..).unwrap()`, both of which
418            // panic at boot when uptime < DEBOUNCE.
419            let mut last_emit: Option<Instant> = None;
420            loop {
421                if slot.stop.load(Ordering::Relaxed) {
422                    self.save_slot(
423                        &slot,
424                        JournalCheckpoint {
425                            journal_id: journal.journal_id(),
426                            next_usn: journal.next_usn(),
427                        },
428                    );
429                    return;
430                }
431                match journal_gone_action(journal.read_blocking(&mut buf)) {
432                    TailStep::Apply {
433                        records: rs,
434                        truncated,
435                    } => {
436                        if truncated {
437                            Counters::bump(&self.metrics.counters.usn_batches_truncated);
438                            tracing::warn!(volume = %label, "USN batch had malformed tail bytes");
439                        }
440                        if rs.is_empty() {
441                            continue;
442                        }
443                        if let Some(idx) = slot.index.write().as_mut() {
444                            let stage = crate::metrics::Stage::start();
445                            let s = apply_batch(idx, &rs, fetch.as_ref());
446                            Counters::add(
447                                &self.metrics.counters.stat_fetch_failures,
448                                s.stat_failures as u64,
449                            );
450                            self.metrics.record_usn(UsnTrace {
451                                volume: label.clone(),
452                                records: rs.len() as u64,
453                                upserted: s.created_or_renamed as u64,
454                                deleted: s.deleted as u64,
455                                stat_updated: s.stat_updated as u64,
456                                stat_failures: s.stat_failures as u64,
457                                apply_us: stage.elapsed_us(),
458                            });
459                            *slot.scanned.lock() = idx.live_len() as u64;
460                        }
461                        // Concurrency invariant (checkpoint-after-apply):
462                        // the index is mutated first, the shared checkpoint
463                        // published second. A concurrent `Engine::flush`
464                        // reading the checkpoint first therefore always
465                        // saves checkpoint ≤ index — the USN replay on load
466                        // covers the gap (re-applying records is idempotent;
467                        // skipping them would not be — see
468                        // `JournalCheckpoint`). Do not reorder these two.
469                        *slot.checkpoint.lock() = Some(JournalCheckpoint {
470                            journal_id: journal.journal_id(),
471                            next_usn: journal.next_usn(),
472                        });
473                        self.maybe_compact(&slot);
474                        if last_emit.is_none_or(|t| t.elapsed() >= INDEX_CHANGED_DEBOUNCE) {
475                            last_emit = Some(Instant::now());
476                            self.emit(EngineEvent::IndexChanged {
477                                volume: label.clone(),
478                            });
479                        }
480                    }
481                    TailStep::Rescan(gone) => {
482                        Counters::bump(&self.metrics.counters.journal_rescans);
483                        tracing::warn!(volume = %label, ?gone, "journal gone — full rescan");
484                        // The old journal id is dead; a flush during the
485                        // rescan must not pair it with the new index.
486                        *slot.checkpoint.lock() = None;
487                        *slot.phase.lock() = VolumeState::Rescanning;
488                        self.emit(EngineEvent::RescanStarted {
489                            volume: label.clone(),
490                        });
491                        store.remove();
492                        break; // restart the outer loop → fresh journal + scan
493                    }
494                    TailStep::Fail(e) => {
495                        *slot.phase.lock() = VolumeState::Failed;
496                        self.emit(EngineEvent::VolumeFailed {
497                            volume: label,
498                            message: e.to_string(),
499                        });
500                        return;
501                    }
502                }
503            }
504        }
505    }
506
507    /// Compact once the tombstone/garbage thresholds trip (checked per
508    /// applied USN batch). The copy builds under a *read* guard — this
509    /// volume thread is the index's only writer — and the write lock is
510    /// held for the swap alone. `install_index` bumps the structural
511    /// generation, hard-staling open result handles.
512    pub(super) fn maybe_compact(&self, slot: &VolumeSlot) {
513        let compacted = {
514            let guard = slot.index.read();
515            let Some(idx) = guard.as_ref().filter(|idx| idx.compaction_due()) else {
516                return;
517            };
518            let stage = crate::metrics::Stage::start();
519            let generation = idx.content_generation();
520            let dropped = idx.len() - idx.live_len();
521            let new_idx = idx.compacted();
522            tracing::info!(
523                volume = %slot.label,
524                dropped_entries = dropped,
525                reclaimed_name_bytes = idx.stats(&slot.label).dead_name_bytes,
526                ms = stage.elapsed_us() / 1000,
527                "index compacted"
528            );
529            (new_idx, generation)
530        };
531        // Generation recheck between copy and swap — pure half in
532        // `compact_recheck` (see its doc for the single-writer invariant).
533        let guard = slot.index.read();
534        let current = guard
535            .as_ref()
536            .map(crate::index::VolumeIndex::content_generation);
537        drop(guard);
538        if compact_recheck(compacted.1, current) == CompactionVerdict::Abort {
539            Counters::bump(&self.metrics.counters.compaction_aborts);
540            tracing::warn!(
541                volume = %slot.label,
542                "index mutated during compaction — copy discarded"
543            );
544            return;
545        }
546        slot.install_index(compacted.0);
547        if let Some(idx) = slot.index.read().as_ref() {
548            crate::query::prewarm(idx);
549        }
550    }
551}
552
553/// Test-only spawn with both seams injected (production wiring is
554/// `index_start` → `volume_thread`). Mirrors `index_start`'s slot
555/// construction and thread naming exactly.
556#[cfg(all(test, windows))]
557impl Engine {
558    pub(super) fn spawn_worker_with_seams(
559        self: &Arc<Self>,
560        label: &str,
561        store: Arc<dyn super::seams::SnapshotStore>,
562        mut journal: Box<dyn JournalSource>,
563    ) {
564        let slot = Arc::new(VolumeSlot::scanning(label.to_string(), store));
565        self.volumes.write().push(slot.clone());
566        let engine = self.clone();
567        let handle = std::thread::Builder::new()
568            .name(format!("fmf-vol-{label}"))
569            .spawn(move || engine.volume_thread_with(slot, journal.as_mut()))
570            .expect("spawn volume thread");
571        self.threads.lock().push(handle);
572    }
573}