Skip to main content

fmf_core\usn/
session.rs

1//! Live USN-journal session: volume handle, FSCTL wrappers, blocking reads
2//! and the per-file stat fetcher.
3//!
4//! This is the only OS-facing part of the `usn` module — everything above it
5//! works on parsed records.
6
7use std::ffi::OsStr;
8use std::os::windows::ffi::OsStrExt;
9use std::os::windows::io::{FromRawHandle, OwnedHandle, RawHandle};
10
11use thiserror::Error;
12use windows_sys::Win32::Foundation::{
13    ERROR_INVALID_PARAMETER, ERROR_JOURNAL_DELETE_IN_PROGRESS, ERROR_JOURNAL_ENTRY_DELETED,
14    ERROR_JOURNAL_NOT_ACTIVE, GENERIC_READ, GetLastError, HANDLE, INVALID_HANDLE_VALUE,
15};
16use windows_sys::Win32::Storage::FileSystem::{
17    BY_HANDLE_FILE_INFORMATION, CreateFileW, FILE_FLAG_BACKUP_SEMANTICS, FILE_ID_DESCRIPTOR,
18    FILE_SHARE_DELETE, FILE_SHARE_READ, FILE_SHARE_WRITE, GetFileInformationByHandle,
19    OPEN_EXISTING, OpenFileById,
20};
21use windows_sys::Win32::System::IO::DeviceIoControl;
22use windows_sys::Win32::System::Ioctl::{
23    CREATE_USN_JOURNAL_DATA, FSCTL_CREATE_USN_JOURNAL, FSCTL_QUERY_USN_JOURNAL,
24    FSCTL_READ_USN_JOURNAL, READ_USN_JOURNAL_DATA_V0, USN_JOURNAL_DATA_V0,
25};
26
27use super::apply::StatFetcher;
28use super::records::{UsnRecord, parse_buffer};
29
30/// Hard failure from the OS-facing journal/volume layer (unrecoverable here;
31/// distinct from the recoverable journal-gone conditions in [`JournalGone`]).
32#[derive(Debug, Error)]
33pub enum UsnError {
34    /// Opening the volume handle failed: the volume path (`\\.\C:`) and the
35    /// raw win32 error code.
36    #[error("cannot open volume {0} (win32 error {1})")]
37    OpenVolume(String, u32),
38    /// A `DeviceIoControl`/FSCTL call failed, carrying the raw win32 error code.
39    #[error("FSCTL failed (win32 error {0})")]
40    Fsctl(u32),
41}
42
43/// Why the journal can no longer be tailed; all of these mean "fall back to
44/// a full rescan" (docs/RESEARCH.md established practice).
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum JournalGone {
47    /// The requested USN range was overwritten/purged (`ERROR_JOURNAL_ENTRY_DELETED`).
48    EntryDeleted,
49    /// The journal is being deleted (`ERROR_JOURNAL_DELETE_IN_PROGRESS`).
50    DeleteInProgress,
51    /// No active journal exists on the volume (`ERROR_JOURNAL_NOT_ACTIVE`).
52    NotActive,
53    /// The journal id no longer matches the persisted checkpoint (journal was
54    /// recreated; surfaced as `ERROR_INVALID_PARAMETER`).
55    IdMismatch,
56}
57
58/// Result of one blocking journal read: either a parsed batch of records or a
59/// recoverable signal that the journal can no longer be tailed.
60pub enum ReadOutcome {
61    /// A batch of parsed records from the journal buffer.
62    Records {
63        /// The parsed USN records, in journal order.
64        records: Vec<UsnRecord>,
65        /// Trailing bytes were malformed and dropped — surfaced as a
66        /// counter + warning by the caller.
67        truncated: bool,
68    },
69    /// The journal can no longer be tailed; the caller falls back to a rescan.
70    Gone(JournalGone),
71}
72
73/// An open USN journal positioned for tailing: the volume handle plus the
74/// current replay cursor.
75pub struct UsnJournal {
76    handle: OwnedHandle,
77    /// The journal's identity (`UsnJournalID`); changes if NTFS recreates it,
78    /// which invalidates any persisted checkpoint.
79    pub journal_id: u64,
80    /// The next USN to read from; advances past each returned batch.
81    pub next_usn: i64,
82}
83
84fn wide(s: &str) -> Vec<u16> {
85    OsStr::new(s)
86        .encode_wide()
87        .chain(std::iter::once(0))
88        .collect()
89}
90
91fn open_volume_handle(drive: &str) -> Result<OwnedHandle, UsnError> {
92    let path = format!(r"\\.\{}", drive.trim_end_matches(['\\', '/']));
93    let wpath = wide(&path);
94    unsafe {
95        let h = CreateFileW(
96            wpath.as_ptr(),
97            GENERIC_READ,
98            FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
99            std::ptr::null(),
100            OPEN_EXISTING,
101            0,
102            std::ptr::null_mut(),
103        );
104        if h == INVALID_HANDLE_VALUE {
105            return Err(UsnError::OpenVolume(path, GetLastError()));
106        }
107        Ok(OwnedHandle::from_raw_handle(h as RawHandle))
108    }
109}
110
111impl UsnJournal {
112    /// Open the journal for tailing. Creates it when missing (requires
113    /// elevation, which the whole scan path already needs). `start_usn` is
114    /// the persisted checkpoint; pass `None` to start at the current end.
115    ///
116    /// # Errors
117    ///
118    /// Returns [`UsnError::OpenVolume`] if the volume handle cannot be opened,
119    /// or [`UsnError::Fsctl`] if creating or querying the journal fails.
120    pub fn open(drive: &str, start_usn: Option<i64>) -> Result<Self, UsnError> {
121        let handle = open_volume_handle(drive)?;
122        let data = Self::query_or_create(&handle)?;
123        let next = match start_usn {
124            Some(usn) => usn,
125            None => data.NextUsn,
126        };
127        Ok(Self {
128            handle,
129            journal_id: data.UsnJournalID,
130            next_usn: next,
131        })
132    }
133
134    /// True if the persisted checkpoint is still replayable from this journal.
135    #[must_use]
136    pub const fn checkpoint_valid(&self, persisted_journal_id: u64, data_first_usn: i64) -> bool {
137        self.journal_id == persisted_journal_id && self.next_usn >= data_first_usn
138    }
139
140    /// Query the live journal metadata (id and retained USN range).
141    ///
142    /// # Errors
143    ///
144    /// Returns [`UsnError::Fsctl`] if the `FSCTL_QUERY_USN_JOURNAL` call fails
145    /// (including a journal that is no longer active).
146    pub fn query(&self) -> Result<USN_JOURNAL_DATA_V0, UsnError> {
147        Self::query_raw(&self.handle).map_err(|e| match e {
148            QueryErr::Gone => UsnError::Fsctl(ERROR_JOURNAL_NOT_ACTIVE),
149            QueryErr::Os(code) => UsnError::Fsctl(code),
150        })
151    }
152
153    fn query_or_create(handle: &OwnedHandle) -> Result<USN_JOURNAL_DATA_V0, UsnError> {
154        match Self::query_raw(handle) {
155            Ok(d) => Ok(d),
156            Err(QueryErr::Gone) => {
157                // 0 = let NTFS pick defaults (typically 32MB max).
158                let create = CREATE_USN_JOURNAL_DATA {
159                    MaximumSize: 0,
160                    AllocationDelta: 0,
161                };
162                unsafe {
163                    let mut returned = 0u32;
164                    let ok = DeviceIoControl(
165                        raw(handle),
166                        FSCTL_CREATE_USN_JOURNAL,
167                        (&raw const create).cast(),
168                        size_of::<CREATE_USN_JOURNAL_DATA>() as u32,
169                        std::ptr::null_mut(),
170                        0,
171                        &raw mut returned,
172                        std::ptr::null_mut(),
173                    );
174                    if ok == 0 {
175                        return Err(UsnError::Fsctl(GetLastError()));
176                    }
177                }
178                Self::query_raw(handle).map_err(|e| match e {
179                    QueryErr::Os(code) => UsnError::Fsctl(code),
180                    QueryErr::Gone => UsnError::Fsctl(ERROR_JOURNAL_NOT_ACTIVE),
181                })
182            }
183            Err(QueryErr::Os(code)) => Err(UsnError::Fsctl(code)),
184        }
185    }
186
187    fn query_raw(handle: &OwnedHandle) -> Result<USN_JOURNAL_DATA_V0, QueryErr> {
188        unsafe {
189            let mut data: USN_JOURNAL_DATA_V0 = std::mem::zeroed();
190            let mut returned = 0u32;
191            let ok = DeviceIoControl(
192                raw(handle),
193                FSCTL_QUERY_USN_JOURNAL,
194                std::ptr::null(),
195                0,
196                (&raw mut data).cast(),
197                size_of::<USN_JOURNAL_DATA_V0>() as u32,
198                &raw mut returned,
199                std::ptr::null_mut(),
200            );
201            if ok == 0 {
202                let code = GetLastError();
203                return Err(match code {
204                    ERROR_JOURNAL_NOT_ACTIVE | ERROR_JOURNAL_DELETE_IN_PROGRESS => QueryErr::Gone,
205                    other => QueryErr::Os(other),
206                });
207            }
208            Ok(data)
209        }
210    }
211
212    /// Blocking read: returns once at least one record is available (or the
213    /// journal became invalid). Advances `next_usn` past the returned batch.
214    ///
215    /// # Errors
216    ///
217    /// Returns [`UsnError::Fsctl`] if the `FSCTL_READ_USN_JOURNAL` call fails
218    /// for a reason other than a recoverable journal-gone condition (those
219    /// are reported through [`ReadOutcome`]).
220    pub fn read_blocking(&mut self, buf: &mut Vec<u8>) -> Result<ReadOutcome, UsnError> {
221        const BUF: usize = 1 << 16;
222        buf.resize(BUF, 0);
223        let input = READ_USN_JOURNAL_DATA_V0 {
224            StartUsn: self.next_usn,
225            ReasonMask: u32::MAX,
226            ReturnOnlyOnClose: 0,
227            Timeout: 0,
228            BytesToWaitFor: 1, // block until data arrives
229            UsnJournalID: self.journal_id,
230        };
231        unsafe {
232            let mut returned = 0u32;
233            let ok = DeviceIoControl(
234                raw(&self.handle),
235                FSCTL_READ_USN_JOURNAL,
236                (&raw const input).cast(),
237                size_of::<READ_USN_JOURNAL_DATA_V0>() as u32,
238                buf.as_mut_ptr().cast(),
239                BUF as u32,
240                &raw mut returned,
241                std::ptr::null_mut(),
242            );
243            if ok == 0 {
244                let code = GetLastError();
245                return match code {
246                    ERROR_JOURNAL_ENTRY_DELETED => Ok(ReadOutcome::Gone(JournalGone::EntryDeleted)),
247                    ERROR_JOURNAL_DELETE_IN_PROGRESS => {
248                        Ok(ReadOutcome::Gone(JournalGone::DeleteInProgress))
249                    }
250                    ERROR_JOURNAL_NOT_ACTIVE => Ok(ReadOutcome::Gone(JournalGone::NotActive)),
251                    // Returned when UsnJournalID no longer matches.
252                    ERROR_INVALID_PARAMETER => Ok(ReadOutcome::Gone(JournalGone::IdMismatch)),
253                    other => Err(UsnError::Fsctl(other)),
254                };
255            }
256            let (next, records, truncated) = parse_buffer(&buf[..returned as usize]);
257            if next != 0 {
258                self.next_usn = next as i64;
259            }
260            Ok(ReadOutcome::Records { records, truncated })
261        }
262    }
263}
264
265enum QueryErr {
266    Gone,
267    Os(u32),
268}
269
270fn raw(h: &OwnedHandle) -> HANDLE {
271    use std::os::windows::io::AsRawHandle;
272    h.as_raw_handle() as HANDLE
273}
274
275/// Live stat fetcher: opens the file by FRN on the same volume and reads
276/// size + mtime. Read-only, never follows the open with any mutation.
277pub struct VolumeStatFetcher {
278    handle: OwnedHandle,
279    failures: std::sync::atomic::AtomicU64,
280}
281
282impl VolumeStatFetcher {
283    /// Open a read-only volume handle for per-file stat lookups by FRN.
284    ///
285    /// # Errors
286    ///
287    /// Returns [`UsnError::OpenVolume`] if the volume handle cannot be opened.
288    pub fn open(drive: &str) -> Result<Self, UsnError> {
289        Ok(Self {
290            handle: open_volume_handle(drive)?,
291            failures: std::sync::atomic::AtomicU64::new(0),
292        })
293    }
294}
295
296impl VolumeStatFetcher {
297    /// Failures here are expected (file deleted before we look) but a flood
298    /// of them is not — count every one, log the first few with the win32
299    /// code so the pattern is diagnosable.
300    fn note_failure(&self, frn: u64, stage: &str) {
301        let n = self
302            .failures
303            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
304        if n < 5 {
305            let code = unsafe { GetLastError() };
306            tracing::warn!(frn, stage, code, "stat fetch failed");
307        }
308    }
309}
310
311impl StatFetcher for VolumeStatFetcher {
312    fn stat(&self, frn: u64) -> Option<(u64, i64)> {
313        unsafe {
314            let mut desc: FILE_ID_DESCRIPTOR = std::mem::zeroed();
315            desc.dwSize = size_of::<FILE_ID_DESCRIPTOR>() as u32;
316            desc.Type = 0; // FileIdType
317            desc.Anonymous.FileId = frn as i64;
318            let h = OpenFileById(
319                raw(&self.handle),
320                &raw const desc,
321                0, // attributes-only access
322                FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
323                std::ptr::null(),
324                FILE_FLAG_BACKUP_SEMANTICS,
325            );
326            if h == INVALID_HANDLE_VALUE {
327                self.note_failure(frn, "OpenFileById");
328                return None;
329            }
330            let h = OwnedHandle::from_raw_handle(h as RawHandle);
331            let mut info: BY_HANDLE_FILE_INFORMATION = std::mem::zeroed();
332            if GetFileInformationByHandle(raw(&h), &raw mut info) == 0 {
333                self.note_failure(frn, "GetFileInformationByHandle");
334                return None;
335            }
336            let size = ((info.nFileSizeHigh as u64) << 32) | info.nFileSizeLow as u64;
337            let mtime = ((info.ftLastWriteTime.dwHighDateTime as i64) << 32)
338                | info.ftLastWriteTime.dwLowDateTime as i64;
339            Some((size, mtime))
340        }
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347
348    /// Live smoke for the OS-facing session: open the C: journal, query it,
349    /// and complete one blocking read. Run from an elevated shell:
350    /// `FMF_ADMIN_TESTS=1` cargo test -p fmf-core -- --ignored `usn_journal`
351    #[test]
352    #[ignore = "requires elevation; gated by FMF_ADMIN_TESTS"]
353    fn usn_journal_live_open_query_and_one_read() {
354        if std::env::var("FMF_ADMIN_TESTS").as_deref() != Ok("1") {
355            eprintln!("FMF_ADMIN_TESTS != 1 — skipping");
356            return;
357        }
358        let mut journal = UsnJournal::open("C:", None).expect("open C: journal (elevated?)");
359        assert_ne!(journal.journal_id, 0);
360
361        let data = journal.query().expect("FSCTL_QUERY_USN_JOURNAL");
362        assert_eq!(data.UsnJournalID, journal.journal_id);
363        assert!(journal.checkpoint_valid(data.UsnJournalID, data.FirstUsn));
364        assert!(!journal.checkpoint_valid(data.UsnJournalID.wrapping_add(1), data.FirstUsn));
365
366        // Rewind to the oldest retained USN so the blocking read returns
367        // existing history immediately instead of waiting for new activity.
368        let first_usn = data.FirstUsn;
369        journal.next_usn = first_usn;
370
371        // read_blocking has no timeout by design; run it on a helper thread
372        // and bound the wait here so a regression hangs the test, not the
373        // suite. The tickle file (temp dir is on C: on a stock setup) covers
374        // the freshly-created-journal case where no history exists yet.
375        let (tx, rx) = std::sync::mpsc::channel();
376        let reader = std::thread::spawn(move || {
377            let mut buf = Vec::new();
378            let outcome = journal.read_blocking(&mut buf);
379            let _ = tx.send((outcome, journal.next_usn));
380        });
381        let tickle = std::env::temp_dir().join("fmf-usn-smoke.tmp");
382        let _ = std::fs::write(&tickle, b"tick");
383        let _ = std::fs::remove_file(&tickle);
384
385        let (outcome, advanced_usn) = rx
386            .recv_timeout(std::time::Duration::from_secs(30))
387            .expect("read_blocking did not return within 30s");
388        reader.join().unwrap();
389
390        match outcome.expect("FSCTL_READ_USN_JOURNAL") {
391            ReadOutcome::Records { records, truncated } => {
392                assert!(!truncated, "live FSCTL buffer flagged as truncated");
393                assert!(!records.is_empty(), "blocking read returned no records");
394                assert!(
395                    advanced_usn > first_usn,
396                    "next_usn must advance past the batch"
397                );
398            }
399            ReadOutcome::Gone(gone) => panic!("journal gone during smoke: {gone:?}"),
400        }
401    }
402}