fmf_core\engine/seams.rs
1//! The engine's only two OS-effect seams (ADR-0018 — this is the hard cap,
2//! do not add a third): snapshot persistence and the USN journal session.
3//! They exist so the volume worker's failure paths (corrupt snapshot,
4//! journal-gone, failed saves, stat-fetch storms) replay in unprivileged,
5//! deterministic tests (`worker_tests.rs`). The Windows implementations are
6//! thin wrappers over the exact calls the worker made before the seam was
7//! introduced — behavior-identical by construction.
8//!
9//! Granularity guard: every method here runs at establish/batch/save
10//! frequency. Nothing per-entry goes through these traits (the per-record
11//! `StatFetcher` handed out below was already a `dyn` call before the seam
12//! existed — see `usn::apply_batch`).
13
14use std::path::PathBuf;
15
16use crate::index::VolumeIndex;
17#[cfg(windows)]
18use crate::usn::{ReadOutcome, StatFetcher, UsnError, UsnJournal, VolumeStatFetcher};
19
20/// Snapshot persistence for one volume (`{index_dir}\{letter}.fmfidx`).
21pub trait SnapshotStore: Send + Sync {
22 /// Load the persisted snapshot: the rebuilt index plus the journal
23 /// checkpoint (`journal_id`, `next_usn`) it was saved with.
24 /// `ErrorKind::NotFound` means "first run" (not a failure); any other
25 /// error means a corrupt/unreadable file — the caller counts it in
26 /// `snapshot_load_failures` and falls back to a full scan.
27 fn load(&self) -> std::io::Result<(VolumeIndex, u64, i64)>;
28 /// Size of the persisted snapshot in bytes (observability only — the
29 /// restore `ScanTrace`). Best effort; 0 when unknown.
30 fn file_bytes(&self) -> u64;
31 /// Persist the index with its checkpoint atomically (tmp + rename):
32 /// a torn write must never become a loadable snapshot.
33 fn save_atomic(&self, idx: &VolumeIndex, journal_id: u64, next_usn: i64)
34 -> std::io::Result<()>;
35 /// Best-effort removal (journal-gone: a snapshot pinned to the dead
36 /// journal id must not be restored on the next start). Failures are
37 /// intentionally ignored — checkpoint validation on load rejects a
38 /// stale file anyway.
39 fn remove(&self);
40}
41
42/// Production store: thin wrapper over `VolumeIndex::{load_from, save_to}`
43/// plus `fs::{metadata, remove_file}` on the volume's snapshot path.
44pub struct WinSnapshotStore {
45 path: PathBuf,
46}
47
48impl WinSnapshotStore {
49 pub(crate) const fn new(path: PathBuf) -> Self {
50 Self { path }
51 }
52}
53
54impl SnapshotStore for WinSnapshotStore {
55 fn load(&self) -> std::io::Result<(VolumeIndex, u64, i64)> {
56 VolumeIndex::load_from(&self.path)
57 }
58
59 fn file_bytes(&self) -> u64 {
60 std::fs::metadata(&self.path).map_or(0, |m| m.len())
61 }
62
63 fn save_atomic(
64 &self,
65 idx: &VolumeIndex,
66 journal_id: u64,
67 next_usn: i64,
68 ) -> std::io::Result<()> {
69 idx.save_to(&self.path, journal_id, next_usn)
70 }
71
72 fn remove(&self) {
73 let _ = std::fs::remove_file(&self.path);
74 }
75}
76
77/// What checkpoint validation needs from `FSCTL_QUERY_USN_JOURNAL`.
78#[cfg(windows)]
79#[derive(Clone, Copy, Debug, PartialEq, Eq)]
80pub struct JournalView {
81 pub(crate) journal_id: u64,
82 /// Oldest USN still retained — a persisted cursor older than this has
83 /// lost records and cannot be replayed.
84 pub(crate) first_usn: i64,
85}
86
87#[cfg(windows)]
88impl JournalView {
89 /// The synthetic view for a scope-mode (folder-walk) snapshot (ADR-0024):
90 /// journal id 0 with no retention window, so `snapshot_decision` always
91 /// restores a loaded walk snapshot — there is no USN cursor to validate.
92 pub(crate) const fn scope() -> Self {
93 Self {
94 journal_id: 0,
95 first_usn: 0,
96 }
97 }
98}
99
100/// One volume's USN journal session, reopenable across journal-gone
101/// rescans. `open` must succeed before any other method is called (the
102/// worker guarantees this; implementations may panic otherwise — the
103/// worker's panic firewall turns that into a visible `VolumeFailed`).
104#[cfg(windows)]
105pub trait JournalSource: Send {
106 /// (Re)open the journal, creating it when missing. Positions the
107 /// cursor at the journal's current end. Called once per establish
108 /// cycle: at start and after every journal-gone rescan.
109 fn open(&mut self) -> Result<(), UsnError>;
110 /// Live journal identity/retention for checkpoint validation.
111 fn query(&mut self) -> Result<JournalView, UsnError>;
112 /// Blocking read of the next batch. Semantics the worker relies on:
113 /// blocks until records exist, then returns them and advances the
114 /// cursor past the batch; `Gone` when the journal died under us; `Err`
115 /// on an unrecoverable failure. An empty `Records` batch is a benign
116 /// wakeup — the worker re-checks its stop flag and reads again (fakes
117 /// use this to unblock on stop; the live read returns on the next
118 /// volume write, which is what keeps `Engine::shutdown`'s join prompt).
119 fn read_blocking(&mut self, buf: &mut Vec<u8>) -> Result<ReadOutcome, UsnError>;
120 /// Journal id of the open session (checkpoint identity).
121 fn journal_id(&self) -> u64;
122 /// Cursor the next read starts at (the value persisted in checkpoints).
123 fn next_usn(&self) -> i64;
124 /// Reposition the cursor (a snapshot restore replays from its
125 /// persisted checkpoint instead of the journal's current end).
126 fn set_next_usn(&mut self, usn: i64);
127 /// Size/mtime fetcher bound to the same volume, opened once per tail
128 /// session. Per-record `stat` calls were already dynamic (`usn::apply`).
129 fn open_stat_fetcher(&self) -> Result<Box<dyn StatFetcher>, UsnError>;
130}
131
132/// Production journal: thin wrapper over `usn::session::UsnJournal` /
133/// `VolumeStatFetcher` for one drive label.
134#[cfg(windows)]
135pub struct WinJournalSource {
136 label: String,
137 session: Option<UsnJournal>,
138}
139
140#[cfg(windows)]
141impl WinJournalSource {
142 pub(crate) const fn new(label: String) -> Self {
143 Self {
144 label,
145 session: None,
146 }
147 }
148
149 const fn session(&self) -> &UsnJournal {
150 self.session.as_ref().expect("journal used before open")
151 }
152}
153
154#[cfg(windows)]
155impl JournalSource for WinJournalSource {
156 fn open(&mut self) -> Result<(), UsnError> {
157 // Drop the previous session first — the pre-seam code's rebinding
158 // per outer-loop iteration closed the old volume handle before
159 // opening the new one.
160 self.session = None;
161 self.session = Some(UsnJournal::open(&self.label, None)?);
162 Ok(())
163 }
164
165 fn query(&mut self) -> Result<JournalView, UsnError> {
166 self.session().query().map(|d| JournalView {
167 journal_id: d.UsnJournalID,
168 first_usn: d.FirstUsn,
169 })
170 }
171
172 fn read_blocking(&mut self, buf: &mut Vec<u8>) -> Result<ReadOutcome, UsnError> {
173 self.session
174 .as_mut()
175 .expect("journal used before open")
176 .read_blocking(buf)
177 }
178
179 fn journal_id(&self) -> u64 {
180 self.session().journal_id
181 }
182
183 fn next_usn(&self) -> i64 {
184 self.session().next_usn
185 }
186
187 fn set_next_usn(&mut self, usn: i64) {
188 self.session
189 .as_mut()
190 .expect("journal used before open")
191 .next_usn = usn;
192 }
193
194 fn open_stat_fetcher(&self) -> Result<Box<dyn StatFetcher>, UsnError> {
195 Ok(Box::new(VolumeStatFetcher::open(&self.label)?))
196 }
197}