Skip to main content

fmf_core\scan/
pipeline.rs

1//! Read-ahead pipeline (ADR-0011): record-aligned chunk planning over the
2//! $MFT run map, plus the dedicated I/O thread that reads chunk N+1 while
3//! chunk N parses. If the thread can't start, the scan degrades to inline
4//! sequential reads (`scan_pipeline_fallbacks`).
5
6use std::time::{Duration, Instant};
7
8use ntfs_reader::errors::NtfsReaderError;
9
10use super::volume_io::{RunMap, open_raw_volume};
11
12pub(super) const SCAN_CHUNK: usize = 16 << 20;
13/// Chunk buffers cycling between the I/O thread and the parser (one being
14/// read, one queued, one being parsed) — bounds peak RAM at 3 chunks.
15const PIPELINE_BUFFERS: usize = 3;
16
17/// Record-aligned read unit of the $MFT data stream.
18pub(super) struct Chunk {
19    pub(super) logical: u64,
20    pub(super) phys: u64,
21    pub(super) want: usize,
22}
23
24/// Pure chunk-plan arithmetic: record-aligned chunking, sparse-hole
25/// skipping, no I/O.
26pub(super) fn plan_chunks(map: &RunMap, data_size: u64, record_size: usize) -> Vec<Chunk> {
27    let mut chunks = Vec::new();
28    let mut logical = 0u64;
29    while logical < data_size {
30        let Some((phys, contig)) = map.physical(logical) else {
31            logical += record_size as u64; // sparse hole: no records here
32            continue;
33        };
34        let want = SCAN_CHUNK
35            .min(contig as usize)
36            .min((data_size - logical) as usize)
37            / record_size
38            * record_size;
39        if want == 0 {
40            logical += record_size as u64;
41            continue;
42        }
43        chunks.push(Chunk {
44            logical,
45            phys,
46            want,
47        });
48        logical += want as u64;
49    }
50    chunks
51}
52
53/// Read chunks on a dedicated I/O thread while the caller parses the
54/// previous one; buffers cycle through a bounded channel pair. Returns the
55/// accumulated device-read time and the fallback count (1 when the thread
56/// couldn't start and the scan degraded to inline sequential reads).
57pub(super) fn run_chunk_pipeline(
58    volume_path: &str,
59    chunks: &[Chunk],
60    on_chunk: &mut dyn FnMut(usize, &mut [u8]),
61) -> Result<(Duration, u64), NtfsReaderError> {
62    use std::io::{Read, Seek, SeekFrom};
63    use std::sync::mpsc;
64
65    let mut file = open_raw_volume(volume_path)?;
66    let plan: Vec<(u64, usize)> = chunks.iter().map(|c| (c.phys, c.want)).collect();
67    let (full_tx, full_rx) =
68        mpsc::sync_channel::<std::io::Result<(usize, Vec<u8>)>>(PIPELINE_BUFFERS);
69    let (empty_tx, empty_rx) = mpsc::channel::<Vec<u8>>();
70    for _ in 0..PIPELINE_BUFFERS {
71        let _ = empty_tx.send(vec![0u8; SCAN_CHUNK]);
72    }
73
74    let spawned = std::thread::Builder::new()
75        .name("fmf-scan-io".into())
76        .spawn(move || {
77            let mut read_time = Duration::ZERO;
78            for (i, &(phys, want)) in plan.iter().enumerate() {
79                let Ok(mut buf) = empty_rx.recv() else {
80                    break; // parser side gone (error path) — stop reading
81                };
82                let t = Instant::now();
83                let read = file
84                    .seek(SeekFrom::Start(phys))
85                    .and_then(|_| file.read_exact(&mut buf[..want]));
86                read_time += t.elapsed();
87                let failed = read.is_err();
88                if full_tx.send(read.map(|()| (i, buf))).is_err() || failed {
89                    break;
90                }
91            }
92            read_time
93        });
94
95    let handle = match spawned {
96        Ok(h) => h,
97        Err(e) => {
98            // Degraded but correct: read inline on this thread. The original
99            // handle moved into the dead closure, so open a fresh one.
100            tracing::warn!(error = %e, "scan I/O thread unavailable — inline sequential reads");
101            let mut file = open_raw_volume(volume_path)?;
102            let mut buf = vec![0u8; SCAN_CHUNK];
103            let mut read_time = Duration::ZERO;
104            for (i, c) in chunks.iter().enumerate() {
105                let t = Instant::now();
106                file.seek(SeekFrom::Start(c.phys))?;
107                file.read_exact(&mut buf[..c.want])?;
108                read_time += t.elapsed();
109                on_chunk(i, &mut buf[..c.want]);
110            }
111            return Ok((read_time, 1));
112        }
113    };
114
115    let mut result: Result<(), NtfsReaderError> = Ok(());
116    for _ in 0..chunks.len() {
117        match full_rx.recv() {
118            Ok(Ok((i, mut buf))) => {
119                on_chunk(i, &mut buf[..chunks[i].want]);
120                let _ = empty_tx.send(buf);
121            }
122            Ok(Err(e)) => {
123                result = Err(e.into());
124                break;
125            }
126            Err(_) => {
127                result = Err(std::io::Error::other("scan I/O thread terminated early").into());
128                break;
129            }
130        }
131    }
132    // Unblock the thread (its send/recv fail once these drop), then join.
133    drop(full_rx);
134    drop(empty_tx);
135    let read_time = handle.join().unwrap_or(Duration::ZERO);
136    result.map(|()| (read_time, 0))
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142    use crate::index::testutil::TestDir;
143
144    /// Pins `plan_chunks` arithmetic: record alignment, sparse-hole
145    /// skipping, full logical coverage in order.
146    #[test]
147    fn plan_chunks_is_record_aligned_and_ordered() {
148        let rs = 1024usize;
149        // Two data runs separated by a sparse hole; second run larger than
150        // SCAN_CHUNK to force a split.
151        let map = RunMap {
152            runs: vec![
153                (0, 4096, 8 * 1024),
154                (16 * 1024, 0, SCAN_CHUNK as u64 + 4 * 1024),
155            ],
156        };
157        let data_size = 16 * 1024 + SCAN_CHUNK as u64 + 4 * 1024;
158        let chunks = plan_chunks(&map, data_size, rs);
159
160        assert!(!chunks.is_empty());
161        let mut prev_end = 0u64;
162        for c in &chunks {
163            assert_eq!(c.want % rs, 0, "chunk not record-aligned");
164            assert!(c.want <= SCAN_CHUNK);
165            assert!(c.logical >= prev_end, "chunks out of order");
166            prev_end = c.logical + c.want as u64;
167        }
168        let covered: usize = chunks.iter().map(|c| c.want).sum();
169        // Everything except the sparse hole gets read.
170        assert_eq!(covered as u64, data_size - 8 * 1024);
171    }
172
173    /// The pipeline works on any file path (the volume handle is just a
174    /// file with share flags), so ordering, buffer recycling and the error
175    /// path are testable without elevation.
176    #[test]
177    fn pipeline_delivers_chunks_in_order_with_recycled_buffers() {
178        let rs = 512usize;
179        let dir = TestDir::new();
180        let path = dir.join("stream.bin");
181        // 8 runs of 2KiB each, deliberately not in physical order.
182        let total = 16 * 1024usize;
183        let bytes: Vec<u8> = (0..total).map(|i| (i / 7 % 251) as u8).collect();
184        std::fs::write(&path, &bytes).unwrap();
185        let mut runs = Vec::new();
186        for i in 0..8u64 {
187            let phys = ((i + 3) % 8) * 2048; // scrambled physical layout
188            runs.push((i * 2048, phys, 2048));
189        }
190        let map = RunMap { runs };
191        let chunks = plan_chunks(&map, total as u64, rs);
192        assert_eq!(chunks.len(), 8, "one chunk per contiguous run");
193
194        let mut seen = Vec::new();
195        let (read_time, fallbacks) =
196            run_chunk_pipeline(path.to_str().unwrap(), &chunks, &mut |i, got| {
197                let c = &chunks[i];
198                assert_eq!(
199                    got,
200                    &bytes[c.phys as usize..c.phys as usize + c.want],
201                    "chunk {i} bytes must come from its physical offset"
202                );
203                seen.push(i);
204            })
205            .expect("pipeline");
206        assert_eq!(seen, (0..8).collect::<Vec<_>>(), "strict chunk order");
207        assert_eq!(fallbacks, 0);
208        assert!(read_time <= std::time::Duration::from_secs(5));
209    }
210
211    #[test]
212    fn pipeline_propagates_read_errors() {
213        let dir = TestDir::new();
214        let path = dir.join("short.bin");
215        std::fs::write(&path, vec![0u8; 1024]).unwrap();
216        // Plan claims 4KiB at physical 0 — read_exact must fail past EOF and
217        // the error must surface instead of hanging the channel pair.
218        let chunks = vec![Chunk {
219            logical: 0,
220            phys: 0,
221            want: 4096,
222        }];
223        let mut called = 0;
224        let r = run_chunk_pipeline(path.to_str().unwrap(), &chunks, &mut |_, _| called += 1);
225        assert!(r.is_err());
226        assert_eq!(called, 0);
227    }
228}