Skip to main content

fmf_core\scan/
probe.rs

1//! I/O strategy probe (`fmf io-probe`).
2//!
3//! Measurement only: reads the exact chunk plan the scan would, parses
4//! nothing, and reports throughput per strategy (ADR-0011).
5
6use std::time::Instant;
7
8use ntfs_reader::errors::NtfsReaderError;
9
10use crate::mft::MftError;
11
12use super::pipeline::{Chunk, SCAN_CHUNK, plan_chunks};
13use super::volume_io::mft_layout;
14
15/// I/O strategy to measure for one $MFT read pass (ADR-0011).
16#[derive(Clone, Copy, Debug, PartialEq, Eq)]
17pub enum IoProbeMode {
18    /// The production strategy: buffered synchronous reads.
19    Buffered,
20    /// Buffered + `FILE_FLAG_SEQUENTIAL_SCAN` cache hint.
21    Seq,
22    /// `FILE_FLAG_NO_BUFFERING`, synchronous (no cache-manager copy).
23    NoBuf,
24    /// `FILE_FLAG_NO_BUFFERING` + `FILE_FLAG_OVERLAPPED` with `queue_depth`
25    /// reads outstanding — tests whether *sequential* multiplexing helps
26    /// (parallel random reads are known to serialize in the kernel).
27    NoBufOverlapped,
28}
29
30/// Throughput result of one measured $MFT read pass.
31pub struct ProbeStats {
32    /// Bytes read during the measured pass.
33    pub bytes: u64,
34    /// Wall-clock duration of the read pass, in milliseconds.
35    pub elapsed_ms: u64,
36    /// Throughput in mebibytes per second (`bytes` over elapsed seconds).
37    pub mb_per_s: f64,
38}
39
40const FILE_FLAG_SEQUENTIAL_SCAN: u32 = 0x0800_0000;
41const FILE_FLAG_NO_BUFFERING: u32 = 0x2000_0000;
42const FILE_FLAG_OVERLAPPED: u32 = 0x4000_0000;
43/// `NO_BUFFERING` alignment unit: one page satisfies any 512/4096-sector
44/// device for offset, length and buffer-address requirements.
45const NOBUF_ALIGN: usize = 4096;
46
47/// Page-aligned read buffer (`NO_BUFFERING` requires aligned addresses).
48struct AlignedBuf {
49    ptr: std::ptr::NonNull<u8>,
50    layout: std::alloc::Layout,
51}
52
53impl AlignedBuf {
54    fn new(size: usize) -> Self {
55        let layout = std::alloc::Layout::from_size_align(size, NOBUF_ALIGN)
56            .expect("NOBUF_ALIGN is a power-of-two alignment");
57        // Safety: layout has non-zero size; abort on allocation failure.
58        let ptr = std::ptr::NonNull::new(unsafe { std::alloc::alloc(layout) })
59            .unwrap_or_else(|| std::alloc::handle_alloc_error(layout));
60        Self { ptr, layout }
61    }
62
63    const fn as_mut_slice(&mut self) -> &mut [u8] {
64        // Safety: owned allocation of layout.size() bytes.
65        unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.layout.size()) }
66    }
67}
68
69impl Drop for AlignedBuf {
70    fn drop(&mut self) {
71        // Safety: same layout as the allocation.
72        unsafe { std::alloc::dealloc(self.ptr.as_ptr(), self.layout) }
73    }
74}
75
76fn open_with_flags(volume_path: &str, flags: u32) -> std::io::Result<std::fs::File> {
77    use std::os::windows::fs::OpenOptionsExt;
78    const FILE_SHARE_READ: u32 = 0x1;
79    const FILE_SHARE_WRITE: u32 = 0x2;
80    const FILE_SHARE_DELETE: u32 = 0x4;
81    std::fs::OpenOptions::new()
82        .read(true)
83        .share_mode(FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE)
84        .custom_flags(flags)
85        .open(volume_path)
86}
87
88/// Aligned (offset, length) pair covering a chunk under `NO_BUFFERING`.
89const fn aligned_span(c: &Chunk) -> (u64, usize) {
90    let start = c.phys & !(NOBUF_ALIGN as u64 - 1);
91    let end = (c.phys + c.want as u64).next_multiple_of(NOBUF_ALIGN as u64);
92    (start, (end - start) as usize)
93}
94
95fn probe_sync(volume_path: &str, chunks: &[Chunk], flags: u32) -> std::io::Result<u64> {
96    use std::io::{Read, Seek, SeekFrom};
97    let mut file = open_with_flags(volume_path, flags)?;
98    let no_buffering = flags & FILE_FLAG_NO_BUFFERING != 0;
99    let mut buf = AlignedBuf::new(SCAN_CHUNK + 2 * NOBUF_ALIGN);
100    let mut total = 0u64;
101    for c in chunks {
102        let (phys, want) = if no_buffering {
103            aligned_span(c)
104        } else {
105            (c.phys, c.want)
106        };
107        file.seek(SeekFrom::Start(phys))?;
108        file.read_exact(&mut buf.as_mut_slice()[..want])?;
109        total += want as u64;
110    }
111    Ok(total)
112}
113
114/// One overlapped read slot: its buffer, its event, its OVERLAPPED block.
115struct OvSlot {
116    buf: AlignedBuf,
117    event: windows_sys::Win32::Foundation::HANDLE,
118    ov: Box<windows_sys::Win32::System::IO::OVERLAPPED>,
119    want: usize,
120}
121
122fn probe_nobuf_overlapped(
123    volume_path: &str,
124    chunks: &[Chunk],
125    queue_depth: usize,
126) -> std::io::Result<u64> {
127    use std::os::windows::io::AsRawHandle;
128    use windows_sys::Win32::Foundation::{CloseHandle, GetLastError, HANDLE};
129    use windows_sys::Win32::Storage::FileSystem::ReadFile;
130    use windows_sys::Win32::System::IO::{GetOverlappedResult, OVERLAPPED};
131    use windows_sys::Win32::System::Threading::CreateEventW;
132    const ERROR_IO_PENDING: u32 = 997;
133
134    let file = open_with_flags(volume_path, FILE_FLAG_NO_BUFFERING | FILE_FLAG_OVERLAPPED)?;
135    let handle = file.as_raw_handle() as HANDLE;
136    let qd = queue_depth.clamp(1, 16);
137
138    let mut slots: Vec<OvSlot> = (0..qd)
139        .map(|_| {
140            // Safety: plain event creation; null on failure handled below.
141            let event = unsafe { CreateEventW(std::ptr::null(), 1, 0, std::ptr::null()) };
142            OvSlot {
143                buf: AlignedBuf::new(SCAN_CHUNK + 2 * NOBUF_ALIGN),
144                event,
145                ov: Box::new(unsafe { std::mem::zeroed::<OVERLAPPED>() }),
146                want: 0,
147            }
148        })
149        .collect();
150    if slots.iter().any(|s| s.event.is_null()) {
151        for s in &slots {
152            if !s.event.is_null() {
153                unsafe { CloseHandle(s.event) };
154            }
155        }
156        return Err(std::io::Error::last_os_error());
157    }
158
159    let issue = |slot: &mut OvSlot, c: &Chunk| -> std::io::Result<()> {
160        let (phys, want) = aligned_span(c);
161        slot.want = want;
162        *slot.ov = unsafe { std::mem::zeroed() };
163        slot.ov.Anonymous.Anonymous.Offset = (phys & 0xFFFF_FFFF) as u32;
164        slot.ov.Anonymous.Anonymous.OffsetHigh = (phys >> 32) as u32;
165        slot.ov.hEvent = slot.event;
166        // Safety: buffer outlives the I/O (slot waits before reuse/drop).
167        let ok = unsafe {
168            ReadFile(
169                handle,
170                slot.buf.as_mut_slice().as_mut_ptr(),
171                want as u32,
172                std::ptr::null_mut(),
173                &raw mut *slot.ov,
174            )
175        };
176        if ok == 0 && unsafe { GetLastError() } != ERROR_IO_PENDING {
177            return Err(std::io::Error::last_os_error());
178        }
179        Ok(())
180    };
181    let wait = |slot: &mut OvSlot| -> std::io::Result<u64> {
182        let mut transferred = 0u32;
183        // Safety: the OVERLAPPED belongs to an issued read on this handle.
184        let ok =
185            unsafe { GetOverlappedResult(handle, &raw const *slot.ov, &raw mut transferred, 1) };
186        if ok == 0 {
187            return Err(std::io::Error::last_os_error());
188        }
189        Ok(transferred as u64)
190    };
191
192    // Completions are awaited in issue order — chunk order is what the
193    // parse pipeline would need anyway.
194    let mut total = 0u64;
195    let result = (|| -> std::io::Result<u64> {
196        for (i, c) in chunks.iter().enumerate() {
197            let slot_idx = i % qd;
198            if i >= qd {
199                total += wait(&mut slots[slot_idx])?;
200            }
201            issue(&mut slots[slot_idx], c)?;
202        }
203        let issued = chunks.len();
204        for done in issued.saturating_sub(qd)..issued {
205            total += wait(&mut slots[done % qd])?;
206        }
207        Ok(total)
208    })();
209    for s in &slots {
210        unsafe { CloseHandle(s.event) };
211    }
212    result
213}
214
215/// Measure one $MFT read pass under `mode`. Elevation required (the same
216/// volume-handle rule as the scan).
217///
218/// # Errors
219///
220/// Returns [`MftError::NotElevated`] when the process lacks the privileges to
221/// open the raw volume, or [`MftError::Ntfs`] if reading the $MFT layout or
222/// the measured read pass fails.
223pub fn io_probe(
224    drive: &str,
225    mode: IoProbeMode,
226    queue_depth: usize,
227) -> Result<ProbeStats, MftError> {
228    let drive = drive.trim_end_matches(['\\', '/']);
229    let volume_path = format!(r"\\.\{drive}");
230    let (record_size, data_size, runmap) = mft_layout(&volume_path).map_err(|e| match e {
231        NtfsReaderError::ElevationError => MftError::NotElevated,
232        other => MftError::Ntfs(other),
233    })?;
234    let chunks = plan_chunks(&runmap, data_size, record_size);
235
236    let t = Instant::now();
237    let bytes = match mode {
238        IoProbeMode::Buffered => probe_sync(&volume_path, &chunks, 0),
239        IoProbeMode::Seq => probe_sync(&volume_path, &chunks, FILE_FLAG_SEQUENTIAL_SCAN),
240        IoProbeMode::NoBuf => probe_sync(&volume_path, &chunks, FILE_FLAG_NO_BUFFERING),
241        IoProbeMode::NoBufOverlapped => probe_nobuf_overlapped(&volume_path, &chunks, queue_depth),
242    }
243    .map_err(|e| MftError::Ntfs(e.into()))?;
244    let elapsed = t.elapsed();
245    Ok(ProbeStats {
246        bytes,
247        elapsed_ms: elapsed.as_millis() as u64,
248        mb_per_s: bytes as f64 / (1 << 20) as f64 / elapsed.as_secs_f64().max(1e-9),
249    })
250}