Skip to main content

fmf_core\engine/
seams.rs

1//! The engine's only two OS-effect seams (ADR-0018 — this is the hard cap,
2//! do not add a third): snapshot persistence and the USN journal session.
3//! They exist so the volume worker's failure paths (corrupt snapshot,
4//! journal-gone, failed saves, stat-fetch storms) replay in unprivileged,
5//! deterministic tests (`worker_tests.rs`). The Windows implementations are
6//! thin wrappers over the exact calls the worker made before the seam was
7//! introduced — behavior-identical by construction.
8//!
9//! Granularity guard: every method here runs at establish/batch/save
10//! frequency. Nothing per-entry goes through these traits (the per-record
11//! `StatFetcher` handed out below was already a `dyn` call before the seam
12//! existed — see `usn::apply_batch`).
13
14use std::path::PathBuf;
15
16use crate::index::VolumeIndex;
17#[cfg(windows)]
18use crate::usn::{ReadOutcome, StatFetcher, UsnError, UsnJournal, VolumeStatFetcher};
19
20/// Snapshot persistence for one volume (`{index_dir}\{letter}.fmfidx`).
21pub trait SnapshotStore: Send + Sync {
22    /// Load the persisted snapshot: the rebuilt index plus the journal
23    /// checkpoint (`journal_id`, `next_usn`) it was saved with.
24    /// `ErrorKind::NotFound` means "first run" (not a failure); any other
25    /// error means a corrupt/unreadable file — the caller counts it in
26    /// `snapshot_load_failures` and falls back to a full scan.
27    fn load(&self) -> std::io::Result<(VolumeIndex, u64, i64)>;
28    /// Size of the persisted snapshot in bytes (observability only — the
29    /// restore `ScanTrace`). Best effort; 0 when unknown.
30    fn file_bytes(&self) -> u64;
31    /// Persist the index with its checkpoint atomically (tmp + rename):
32    /// a torn write must never become a loadable snapshot.
33    fn save_atomic(&self, idx: &VolumeIndex, journal_id: u64, next_usn: i64)
34    -> std::io::Result<()>;
35    /// Best-effort removal (journal-gone: a snapshot pinned to the dead
36    /// journal id must not be restored on the next start). Failures are
37    /// intentionally ignored — checkpoint validation on load rejects a
38    /// stale file anyway.
39    fn remove(&self);
40}
41
42/// Production store: thin wrapper over `VolumeIndex::{load_from, save_to}`
43/// plus `fs::{metadata, remove_file}` on the volume's snapshot path.
44pub struct WinSnapshotStore {
45    path: PathBuf,
46}
47
48impl WinSnapshotStore {
49    pub(crate) const fn new(path: PathBuf) -> Self {
50        Self { path }
51    }
52}
53
54impl SnapshotStore for WinSnapshotStore {
55    fn load(&self) -> std::io::Result<(VolumeIndex, u64, i64)> {
56        VolumeIndex::load_from(&self.path)
57    }
58
59    fn file_bytes(&self) -> u64 {
60        std::fs::metadata(&self.path).map_or(0, |m| m.len())
61    }
62
63    fn save_atomic(
64        &self,
65        idx: &VolumeIndex,
66        journal_id: u64,
67        next_usn: i64,
68    ) -> std::io::Result<()> {
69        idx.save_to(&self.path, journal_id, next_usn)
70    }
71
72    fn remove(&self) {
73        let _ = std::fs::remove_file(&self.path);
74    }
75}
76
77/// What checkpoint validation needs from `FSCTL_QUERY_USN_JOURNAL`.
78#[cfg(windows)]
79#[derive(Clone, Copy, Debug, PartialEq, Eq)]
80pub struct JournalView {
81    pub(crate) journal_id: u64,
82    /// Oldest USN still retained — a persisted cursor older than this has
83    /// lost records and cannot be replayed.
84    pub(crate) first_usn: i64,
85}
86
87#[cfg(windows)]
88impl JournalView {
89    /// The synthetic view for a scope-mode (folder-walk) snapshot (ADR-0024):
90    /// journal id 0 with no retention window, so `snapshot_decision` always
91    /// restores a loaded walk snapshot — there is no USN cursor to validate.
92    pub(crate) const fn scope() -> Self {
93        Self {
94            journal_id: 0,
95            first_usn: 0,
96        }
97    }
98}
99
100/// One volume's USN journal session, reopenable across journal-gone
101/// rescans. `open` must succeed before any other method is called (the
102/// worker guarantees this; implementations may panic otherwise — the
103/// worker's panic firewall turns that into a visible `VolumeFailed`).
104#[cfg(windows)]
105pub trait JournalSource: Send {
106    /// (Re)open the journal, creating it when missing. Positions the
107    /// cursor at the journal's current end. Called once per establish
108    /// cycle: at start and after every journal-gone rescan.
109    fn open(&mut self) -> Result<(), UsnError>;
110    /// Live journal identity/retention for checkpoint validation.
111    fn query(&mut self) -> Result<JournalView, UsnError>;
112    /// Blocking read of the next batch. Semantics the worker relies on:
113    /// blocks until records exist, then returns them and advances the
114    /// cursor past the batch; `Gone` when the journal died under us; `Err`
115    /// on an unrecoverable failure. An empty `Records` batch is a benign
116    /// wakeup — the worker re-checks its stop flag and reads again (fakes
117    /// use this to unblock on stop; the live read returns on the next
118    /// volume write, which is what keeps `Engine::shutdown`'s join prompt).
119    fn read_blocking(&mut self, buf: &mut Vec<u8>) -> Result<ReadOutcome, UsnError>;
120    /// Journal id of the open session (checkpoint identity).
121    fn journal_id(&self) -> u64;
122    /// Cursor the next read starts at (the value persisted in checkpoints).
123    fn next_usn(&self) -> i64;
124    /// Reposition the cursor (a snapshot restore replays from its
125    /// persisted checkpoint instead of the journal's current end).
126    fn set_next_usn(&mut self, usn: i64);
127    /// Size/mtime fetcher bound to the same volume, opened once per tail
128    /// session. Per-record `stat` calls were already dynamic (`usn::apply`).
129    fn open_stat_fetcher(&self) -> Result<Box<dyn StatFetcher>, UsnError>;
130}
131
132/// Production journal: thin wrapper over `usn::session::UsnJournal` /
133/// `VolumeStatFetcher` for one drive label.
134#[cfg(windows)]
135pub struct WinJournalSource {
136    label: String,
137    session: Option<UsnJournal>,
138}
139
140#[cfg(windows)]
141impl WinJournalSource {
142    pub(crate) const fn new(label: String) -> Self {
143        Self {
144            label,
145            session: None,
146        }
147    }
148
149    const fn session(&self) -> &UsnJournal {
150        self.session.as_ref().expect("journal used before open")
151    }
152}
153
154#[cfg(windows)]
155impl JournalSource for WinJournalSource {
156    fn open(&mut self) -> Result<(), UsnError> {
157        // Drop the previous session first — the pre-seam code's rebinding
158        // per outer-loop iteration closed the old volume handle before
159        // opening the new one.
160        self.session = None;
161        self.session = Some(UsnJournal::open(&self.label, None)?);
162        Ok(())
163    }
164
165    fn query(&mut self) -> Result<JournalView, UsnError> {
166        self.session().query().map(|d| JournalView {
167            journal_id: d.UsnJournalID,
168            first_usn: d.FirstUsn,
169        })
170    }
171
172    fn read_blocking(&mut self, buf: &mut Vec<u8>) -> Result<ReadOutcome, UsnError> {
173        self.session
174            .as_mut()
175            .expect("journal used before open")
176            .read_blocking(buf)
177    }
178
179    fn journal_id(&self) -> u64 {
180        self.session().journal_id
181    }
182
183    fn next_usn(&self) -> i64 {
184        self.session().next_usn
185    }
186
187    fn set_next_usn(&mut self, usn: i64) {
188        self.session
189            .as_mut()
190            .expect("journal used before open")
191            .next_usn = usn;
192    }
193
194    fn open_stat_fetcher(&self) -> Result<Box<dyn StatFetcher>, UsnError> {
195        Ok(Box::new(VolumeStatFetcher::open(&self.label)?))
196    }
197}