fmf_core\engine/worker.rs
1//! The volume worker: the thread that drives one volume through
2//! restore-or-scan → Ready → USN tailing → (journal-gone) rescan, forever.
3//! `volume.rs` is the state's home (`VolumeSlot`, checkpoint, save helper);
4//! this file is the flow's home. Decisions are pure functions; effects
5//! (counters, logs, events, installs, saves) stay in the loop, keyed off
6//! the decisions — that split is what lets `worker_tests.rs` replay the
7//! failure paths deterministically without elevation (ADR-0018, S4b).
8//!
9//! The full $MFT scan itself is deliberately *not* behind a seam (the
10//! 2-trait cap): its execution stays real-volume territory, covered by the
11//! `FMF_ADMIN_TESTS` layer.
12
13use std::sync::Arc;
14use std::sync::atomic::Ordering;
15use std::time::{Duration, Instant};
16
17use crate::metrics::{Counters, ScanTrace, UsnTrace};
18use crate::usn::{JournalGone, ReadOutcome, UsnError, UsnRecord, apply_batch};
19
20use super::seams::{JournalSource, JournalView, WinJournalSource};
21use super::volume::{JournalCheckpoint, VolumeSlot, WorkerKind};
22use super::watch::WatcherJournalSource;
23use super::{Engine, EngineEvent, VolumeState};
24
25/// Engine-side debounce for `IndexChanged` — the only throttle in the whole
26/// change path (docs/ARCHITECTURE.md latency budget).
27const INDEX_CHANGED_DEBOUNCE: Duration = Duration::from_millis(200);
28
29/// How the worker establishes a volume's index at the top of its loop.
30#[derive(Clone, Copy, Debug, PartialEq, Eq)]
31pub(super) enum SnapshotDecision {
32 /// Install the loaded snapshot and replay the journal from its
33 /// persisted cursor.
34 Restore,
35 /// Build the index from a full $MFT scan.
36 FullScan(FullScanReason),
37}
38
39/// Why a full scan was chosen — selects the effect at the call site
40/// (counter + warn / info / silence). Effects stay out of the decision.
41#[derive(Clone, Copy, Debug, PartialEq, Eq)]
42pub(super) enum FullScanReason {
43 /// No snapshot on disk: the normal first run, not a failure.
44 FirstRun,
45 /// Snapshot present but unreadable/corrupt — counted in
46 /// `snapshot_load_failures`, never silent.
47 SnapshotUnusable,
48 /// Snapshot loaded, but its checkpoint cannot be replayed from the
49 /// live journal: the journal id changed, or the persisted cursor was
50 /// already purged (`next_usn < first_usn`).
51 CheckpointStale,
52 /// `FSCTL_QUERY_USN_JOURNAL` failed, so the checkpoint cannot be
53 /// validated and must not be trusted. Silent by design (pre-seam
54 /// behavior): a journal this broken fails the next open/read loudly.
55 JournalQueryFailed,
56}
57
58/// Pure decision: snapshot-load outcome × live-journal view → restore or
59/// full scan. The load result carries only the persisted checkpoint; the
60/// index itself never enters the decision.
61///
62/// `journal` is `None` when `FSCTL_QUERY` failed *or* was skipped because
63/// the load already failed (the worker never queries in that case — a
64/// failed load must not spend an FSCTL). The load arm is matched first,
65/// so the two `None` meanings cannot mix.
66pub(super) fn snapshot_decision(
67 load: Result<(u64, i64), &std::io::Error>,
68 journal: Option<JournalView>,
69) -> SnapshotDecision {
70 match load {
71 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
72 SnapshotDecision::FullScan(FullScanReason::FirstRun)
73 }
74 Err(_) => SnapshotDecision::FullScan(FullScanReason::SnapshotUnusable),
75 Ok((journal_id, next_usn)) => match journal {
76 Some(view) if journal_id == view.journal_id && next_usn >= view.first_usn => {
77 SnapshotDecision::Restore
78 }
79 Some(_) => SnapshotDecision::FullScan(FullScanReason::CheckpointStale),
80 None => SnapshotDecision::FullScan(FullScanReason::JournalQueryFailed),
81 },
82 }
83}
84
85/// What one blocking-read outcome means for the tail loop.
86pub(super) enum TailStep {
87 /// Apply the records to the index, then publish the new checkpoint —
88 /// in that order (see the checkpoint-after-apply invariant at the
89 /// apply site).
90 Apply {
91 records: Vec<UsnRecord>,
92 truncated: bool,
93 },
94 /// The journal id is dead. Recovery is always a full rescan
95 /// (docs/RESEARCH.md standard practice): invalidate the shared checkpoint, drop the
96 /// snapshot, announce Rescanning, restart the outer loop.
97 Rescan(JournalGone),
98 /// Unrecoverable read error — the volume goes Failed.
99 Fail(UsnError),
100}
101
102/// Pure decision: classify one blocking-read outcome into the worker's
103/// next step. Every `JournalGone` variant maps to a rescan — none is
104/// recoverable in place — and an FSCTL error is fatal for the volume.
105pub(super) fn journal_gone_action(outcome: Result<ReadOutcome, UsnError>) -> TailStep {
106 match outcome {
107 Ok(ReadOutcome::Records { records, truncated }) => TailStep::Apply { records, truncated },
108 Ok(ReadOutcome::Gone(gone)) => TailStep::Rescan(gone),
109 Err(e) => TailStep::Fail(e),
110 }
111}
112
113/// Outcome of the compaction generation recheck (pure half of
114/// `maybe_compact`).
115#[derive(Clone, Copy, Debug, PartialEq, Eq)]
116pub(super) enum CompactionVerdict {
117 Install,
118 Abort,
119}
120
121/// Pure decision: the compacted copy was built under a read guard at
122/// content generation `copied_at`; installing it is only sound if nothing
123/// advanced the generation in between. Single-writer invariant: this
124/// volume thread is the index's only writer, so `Abort` means that
125/// invariant broke somewhere and installing would silently lose the
126/// in-between mutations — the copy is discarded loudly instead.
127pub(super) fn compact_recheck(copied_at: u64, current: Option<u64>) -> CompactionVerdict {
128 if current == Some(copied_at) {
129 CompactionVerdict::Install
130 } else {
131 CompactionVerdict::Abort
132 }
133}
134
135impl Engine {
136 /// Production wiring: the Windows journal seam (the snapshot seam
137 /// lives on the slot, created by `index_start`).
138 #[cfg(windows)]
139 pub(super) fn volume_thread(self: Arc<Self>, slot: Arc<VolumeSlot>) {
140 // Pick the change-source seam by slot kind (ADR-0024). Clone the roots
141 // out of the borrow first so `slot` is free to move into the call.
142 let walk_roots = match &slot.kind {
143 WorkerKind::Mft => None,
144 WorkerKind::Walk { roots, .. } => Some(roots.clone()),
145 };
146 if let Some(roots) = walk_roots {
147 let mut journal = WatcherJournalSource::new(roots);
148 self.volume_thread_with(slot, &mut journal);
149 } else {
150 let mut journal = WinJournalSource::new(slot.label.clone());
151 self.volume_thread_with(slot, &mut journal);
152 }
153 }
154
155 /// Panic firewall: a crashing volume thread must never leave the UI
156 /// stuck on "Scanning" with no explanation. The panic itself is logged
157 /// (with backtrace) by the diag hook; this converts it into a visible
158 /// Failed state. Worker entry with an injectable journal seam —
159 /// production passes the Win implementation; `worker_tests.rs` passes
160 /// scripted fakes.
161 #[cfg(windows)]
162 pub(super) fn volume_thread_with(
163 self: Arc<Self>,
164 slot: Arc<VolumeSlot>,
165 journal: &mut dyn JournalSource,
166 ) {
167 let this = self.clone();
168 let slot2 = slot.clone();
169 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
170 this.volume_thread_inner(slot2, journal);
171 }));
172 if result.is_err() {
173 *slot.phase.lock() = VolumeState::Failed;
174 self.emit(EngineEvent::VolumeFailed {
175 volume: slot.label.clone(),
176 message: "internal panic — engine.log に詳細".to_string(),
177 });
178 }
179 }
180
181 #[cfg(windows)]
182 fn volume_thread_inner(
183 self: Arc<Self>,
184 slot: Arc<VolumeSlot>,
185 journal: &mut dyn JournalSource,
186 ) {
187 let label = slot.label.clone();
188 let store = slot.store.clone();
189
190 loop {
191 if slot.stop.load(Ordering::Relaxed) {
192 return;
193 }
194 // 1. Journal first (checkpoint precedes the scan so nothing is
195 // missed), then snapshot-or-scan.
196 if let Err(e) = journal.open() {
197 *slot.phase.lock() = VolumeState::Failed;
198 self.emit(EngineEvent::VolumeFailed {
199 volume: label,
200 message: e.to_string(),
201 });
202 return;
203 }
204
205 let load_stage = std::time::Instant::now();
206 let load = store.load();
207 let journal_view = if load.is_ok() {
208 journal.query().ok()
209 } else {
210 None
211 };
212 let decision = snapshot_decision(
213 load.as_ref()
214 .map(|(_, journal_id, next_usn)| (*journal_id, *next_usn)),
215 journal_view,
216 );
217
218 let idx = match decision {
219 SnapshotDecision::Restore => {
220 let (idx, _journal_id, next_usn) =
221 load.expect("Restore implies a loaded snapshot");
222 journal.set_next_usn(next_usn);
223 let load_ms = load_stage.elapsed().as_millis() as u64;
224 tracing::info!(volume = %label, entries = idx.len(), ms = load_ms, "snapshot restored");
225 let file_bytes = store.file_bytes();
226 self.metrics.record_scan(ScanTrace {
227 volume: label.clone(),
228 source: "snapshot".to_string(),
229 read_bytes: file_bytes,
230 read_ms: 0,
231 mb_per_s: 0.0,
232 parse_ms: 0,
233 deferred_ms: 0,
234 build_ms: 0,
235 sort_ms: 0,
236 total_ms: load_ms,
237 entries: idx.len() as u64,
238 peak_ws_bytes: crate::mft::peak_working_set(),
239 });
240 idx
241 }
242 SnapshotDecision::FullScan(reason) => {
243 match reason {
244 FullScanReason::SnapshotUnusable => {
245 // Corrupt/unreadable snapshot: rescanning
246 // recovers, but the fact must not vanish.
247 if let Err(e) = &load {
248 Counters::bump(&self.metrics.counters.snapshot_load_failures);
249 tracing::warn!(volume = %label, error = %e, "snapshot unusable — full scan");
250 }
251 }
252 FullScanReason::CheckpointStale => {
253 tracing::info!(volume = %label, "snapshot checkpoint stale — full scan");
254 }
255 FullScanReason::FirstRun | FullScanReason::JournalQueryFailed => {}
256 }
257 // A rejected snapshot must not stay resident while the
258 // scan (and the tail session after it) runs.
259 drop(load);
260 // Initial-scan source by slot kind (ADR-0024): the $MFT
261 // stream (elevated) or the non-elevated folder walk. Both
262 // yield (VolumeIndex, ScanStats); the walk is infallible.
263 let scanned = match &slot.kind {
264 WorkerKind::Mft => crate::mft::scan_volume(&label),
265 WorkerKind::Walk { roots, excludes } => {
266 Ok(crate::scan::walk::walk_scan(roots, excludes))
267 }
268 };
269 match scanned {
270 Ok((mut idx, stats)) => {
271 let scan_source = match &slot.kind {
272 WorkerKind::Walk { .. } => "walk",
273 WorkerKind::Mft => "scan",
274 };
275 tracing::info!(
276 volume = %label,
277 entries = idx.len(),
278 ms = stats.elapsed_total_ms,
279 // Normal scope-mode pruning (ADR-0025), 0 elsewhere;
280 // surfaced here rather than as a degrade counter.
281 excluded_pruned = stats.walk_excluded_pruned,
282 "full scan complete"
283 );
284 idx.shrink_to_fit();
285 Counters::add(
286 &self.metrics.counters.corrupt_mft_records,
287 stats.corrupt_records,
288 );
289 Counters::add(
290 &self.metrics.counters.deferred_names_unresolved,
291 stats.deferred_unresolved,
292 );
293 Counters::add(
294 &self.metrics.counters.scan_pipeline_fallbacks,
295 stats.pipeline_fallbacks,
296 );
297 // Single stats→counters mapping point: the scan
298 // internals only return degradations in ScanStats,
299 // never warn. A count add is needed, so instead of
300 // degrade! (bump=+1 only) the add and warn sit on two
301 // adjacent explicit lines, done indivisibly.
302 if stats.ext_name_cache_skipped > 0 {
303 Counters::add(
304 &self.metrics.counters.deferred_name_cache_overflow,
305 stats.ext_name_cache_skipped,
306 );
307 tracing::warn!(
308 volume = %label,
309 skipped = stats.ext_name_cache_skipped,
310 "extension-record name cache full — remainder resolved via disk reads"
311 );
312 }
313 if stats.deferred_name_read_failures > 0 {
314 Counters::add(
315 &self.metrics.counters.deferred_name_read_failures,
316 stats.deferred_name_read_failures,
317 );
318 tracing::warn!(
319 volume = %label,
320 failures = stats.deferred_name_read_failures,
321 "deferred-name disk reads failed — those names stay unresolved until rescan"
322 );
323 }
324 // Scope-walk degradations (ADR-0024): same single
325 // stats→counters+warn mapping point as the $MFT
326 // ones above; zero for the privileged path.
327 if stats.walk_read_errors > 0 {
328 Counters::add(
329 &self.metrics.counters.walk_read_errors,
330 stats.walk_read_errors,
331 );
332 tracing::warn!(
333 volume = %label,
334 errors = stats.walk_read_errors,
335 "scope walk: paths skipped (unreadable) — absent until re-index"
336 );
337 }
338 if stats.walk_depth_truncated > 0 {
339 Counters::add(
340 &self.metrics.counters.walk_depth_truncated,
341 stats.walk_depth_truncated,
342 );
343 tracing::warn!(
344 volume = %label,
345 truncated = stats.walk_depth_truncated,
346 "scope walk: subtrees not descended (depth cap)"
347 );
348 }
349 self.metrics.record_scan(ScanTrace {
350 volume: label.clone(),
351 source: scan_source.to_string(),
352 read_bytes: stats.mft_bytes,
353 read_ms: stats.elapsed_mft_load_ms,
354 mb_per_s: if stats.elapsed_mft_load_ms > 0 {
355 stats.mft_bytes as f64
356 / 1_048.576
357 / stats.elapsed_mft_load_ms as f64
358 } else {
359 0.0
360 },
361 parse_ms: stats.elapsed_parse_ms,
362 deferred_ms: stats.elapsed_deferred_ms,
363 build_ms: stats.elapsed_build_ms,
364 sort_ms: stats.elapsed_sort_ms,
365 total_ms: stats.elapsed_total_ms,
366 entries: idx.len() as u64,
367 peak_ws_bytes: stats.peak_working_set_bytes,
368 });
369 idx
370 }
371 Err(e) => {
372 *slot.phase.lock() = VolumeState::Failed;
373 self.emit(EngineEvent::VolumeFailed {
374 volume: label,
375 message: e.to_string(),
376 });
377 return;
378 }
379 }
380 }
381 };
382
383 let entries = idx.live_len() as u64;
384 *slot.scanned.lock() = entries;
385 slot.install_index(idx);
386 // Scan path: next_usn is the position at journal open (before the
387 // scan), so a flush taken now replays the scan window — correct,
388 // just slightly redundant.
389 *slot.checkpoint.lock() = Some(JournalCheckpoint {
390 journal_id: journal.journal_id(),
391 next_usn: journal.next_usn(),
392 });
393 *slot.phase.lock() = VolumeState::Ready;
394 self.emit(EngineEvent::VolumeReady {
395 volume: label.clone(),
396 entries,
397 });
398 // Prewarm the query accelerators (dir-path memo, offset table)
399 // so the first keystroke never pays the cold-cache cost.
400 if let Some(idx) = slot.index.read().as_ref() {
401 crate::query::prewarm(idx);
402 }
403
404 // 2. Tail the journal until stop or journal-gone.
405 let fetch = match journal.open_stat_fetcher() {
406 Ok(f) => f,
407 Err(e) => {
408 self.emit(EngineEvent::VolumeFailed {
409 volume: label,
410 message: e.to_string(),
411 });
412 return;
413 }
414 };
415 let mut buf = Vec::new();
416 // None = "never emitted" → the first change emits immediately. Avoids
417 // `Instant - DEBOUNCE` and `checked_sub(..).unwrap()`, both of which
418 // panic at boot when uptime < DEBOUNCE.
419 let mut last_emit: Option<Instant> = None;
420 loop {
421 if slot.stop.load(Ordering::Relaxed) {
422 self.save_slot(
423 &slot,
424 JournalCheckpoint {
425 journal_id: journal.journal_id(),
426 next_usn: journal.next_usn(),
427 },
428 );
429 return;
430 }
431 match journal_gone_action(journal.read_blocking(&mut buf)) {
432 TailStep::Apply {
433 records: rs,
434 truncated,
435 } => {
436 if truncated {
437 Counters::bump(&self.metrics.counters.usn_batches_truncated);
438 tracing::warn!(volume = %label, "USN batch had malformed tail bytes");
439 }
440 if rs.is_empty() {
441 continue;
442 }
443 if let Some(idx) = slot.index.write().as_mut() {
444 let stage = crate::metrics::Stage::start();
445 let s = apply_batch(idx, &rs, fetch.as_ref());
446 Counters::add(
447 &self.metrics.counters.stat_fetch_failures,
448 s.stat_failures as u64,
449 );
450 self.metrics.record_usn(UsnTrace {
451 volume: label.clone(),
452 records: rs.len() as u64,
453 upserted: s.created_or_renamed as u64,
454 deleted: s.deleted as u64,
455 stat_updated: s.stat_updated as u64,
456 stat_failures: s.stat_failures as u64,
457 apply_us: stage.elapsed_us(),
458 });
459 *slot.scanned.lock() = idx.live_len() as u64;
460 }
461 // Concurrency invariant (checkpoint-after-apply):
462 // the index is mutated first, the shared checkpoint
463 // published second. A concurrent `Engine::flush`
464 // reading the checkpoint first therefore always
465 // saves checkpoint ≤ index — the USN replay on load
466 // covers the gap (re-applying records is idempotent;
467 // skipping them would not be — see
468 // `JournalCheckpoint`). Do not reorder these two.
469 *slot.checkpoint.lock() = Some(JournalCheckpoint {
470 journal_id: journal.journal_id(),
471 next_usn: journal.next_usn(),
472 });
473 self.maybe_compact(&slot);
474 if last_emit.is_none_or(|t| t.elapsed() >= INDEX_CHANGED_DEBOUNCE) {
475 last_emit = Some(Instant::now());
476 self.emit(EngineEvent::IndexChanged {
477 volume: label.clone(),
478 });
479 }
480 }
481 TailStep::Rescan(gone) => {
482 Counters::bump(&self.metrics.counters.journal_rescans);
483 tracing::warn!(volume = %label, ?gone, "journal gone — full rescan");
484 // The old journal id is dead; a flush during the
485 // rescan must not pair it with the new index.
486 *slot.checkpoint.lock() = None;
487 *slot.phase.lock() = VolumeState::Rescanning;
488 self.emit(EngineEvent::RescanStarted {
489 volume: label.clone(),
490 });
491 store.remove();
492 break; // restart the outer loop → fresh journal + scan
493 }
494 TailStep::Fail(e) => {
495 *slot.phase.lock() = VolumeState::Failed;
496 self.emit(EngineEvent::VolumeFailed {
497 volume: label,
498 message: e.to_string(),
499 });
500 return;
501 }
502 }
503 }
504 }
505 }
506
507 /// Compact once the tombstone/garbage thresholds trip (checked per
508 /// applied USN batch). The copy builds under a *read* guard — this
509 /// volume thread is the index's only writer — and the write lock is
510 /// held for the swap alone. `install_index` bumps the structural
511 /// generation, hard-staling open result handles.
512 pub(super) fn maybe_compact(&self, slot: &VolumeSlot) {
513 let compacted = {
514 let guard = slot.index.read();
515 let Some(idx) = guard.as_ref().filter(|idx| idx.compaction_due()) else {
516 return;
517 };
518 let stage = crate::metrics::Stage::start();
519 let generation = idx.content_generation();
520 let dropped = idx.len() - idx.live_len();
521 let new_idx = idx.compacted();
522 tracing::info!(
523 volume = %slot.label,
524 dropped_entries = dropped,
525 reclaimed_name_bytes = idx.stats(&slot.label).dead_name_bytes,
526 ms = stage.elapsed_us() / 1000,
527 "index compacted"
528 );
529 (new_idx, generation)
530 };
531 // Generation recheck between copy and swap — pure half in
532 // `compact_recheck` (see its doc for the single-writer invariant).
533 let guard = slot.index.read();
534 let current = guard
535 .as_ref()
536 .map(crate::index::VolumeIndex::content_generation);
537 drop(guard);
538 if compact_recheck(compacted.1, current) == CompactionVerdict::Abort {
539 Counters::bump(&self.metrics.counters.compaction_aborts);
540 tracing::warn!(
541 volume = %slot.label,
542 "index mutated during compaction — copy discarded"
543 );
544 return;
545 }
546 slot.install_index(compacted.0);
547 if let Some(idx) = slot.index.read().as_ref() {
548 crate::query::prewarm(idx);
549 }
550 }
551}
552
553/// Test-only spawn with both seams injected (production wiring is
554/// `index_start` → `volume_thread`). Mirrors `index_start`'s slot
555/// construction and thread naming exactly.
556#[cfg(all(test, windows))]
557impl Engine {
558 pub(super) fn spawn_worker_with_seams(
559 self: &Arc<Self>,
560 label: &str,
561 store: Arc<dyn super::seams::SnapshotStore>,
562 mut journal: Box<dyn JournalSource>,
563 ) {
564 let slot = Arc::new(VolumeSlot::scanning(label.to_string(), store));
565 self.volumes.write().push(slot.clone());
566 let engine = self.clone();
567 let handle = std::thread::Builder::new()
568 .name(format!("fmf-vol-{label}"))
569 .spawn(move || engine.volume_thread_with(slot, journal.as_mut()))
570 .expect("spawn volume thread");
571 self.threads.lock().push(handle);
572 }
573}