fmf_core\scan/
pipeline.rs1use std::time::{Duration, Instant};
7
8use ntfs_reader::errors::NtfsReaderError;
9
10use super::volume_io::{RunMap, open_raw_volume};
11
12pub(super) const SCAN_CHUNK: usize = 16 << 20;
13const PIPELINE_BUFFERS: usize = 3;
16
17pub(super) struct Chunk {
19 pub(super) logical: u64,
20 pub(super) phys: u64,
21 pub(super) want: usize,
22}
23
24pub(super) fn plan_chunks(map: &RunMap, data_size: u64, record_size: usize) -> Vec<Chunk> {
27 let mut chunks = Vec::new();
28 let mut logical = 0u64;
29 while logical < data_size {
30 let Some((phys, contig)) = map.physical(logical) else {
31 logical += record_size as u64; continue;
33 };
34 let want = SCAN_CHUNK
35 .min(contig as usize)
36 .min((data_size - logical) as usize)
37 / record_size
38 * record_size;
39 if want == 0 {
40 logical += record_size as u64;
41 continue;
42 }
43 chunks.push(Chunk {
44 logical,
45 phys,
46 want,
47 });
48 logical += want as u64;
49 }
50 chunks
51}
52
53pub(super) fn run_chunk_pipeline(
58 volume_path: &str,
59 chunks: &[Chunk],
60 on_chunk: &mut dyn FnMut(usize, &mut [u8]),
61) -> Result<(Duration, u64), NtfsReaderError> {
62 use std::io::{Read, Seek, SeekFrom};
63 use std::sync::mpsc;
64
65 let mut file = open_raw_volume(volume_path)?;
66 let plan: Vec<(u64, usize)> = chunks.iter().map(|c| (c.phys, c.want)).collect();
67 let (full_tx, full_rx) =
68 mpsc::sync_channel::<std::io::Result<(usize, Vec<u8>)>>(PIPELINE_BUFFERS);
69 let (empty_tx, empty_rx) = mpsc::channel::<Vec<u8>>();
70 for _ in 0..PIPELINE_BUFFERS {
71 let _ = empty_tx.send(vec![0u8; SCAN_CHUNK]);
72 }
73
74 let spawned = std::thread::Builder::new()
75 .name("fmf-scan-io".into())
76 .spawn(move || {
77 let mut read_time = Duration::ZERO;
78 for (i, &(phys, want)) in plan.iter().enumerate() {
79 let Ok(mut buf) = empty_rx.recv() else {
80 break; };
82 let t = Instant::now();
83 let read = file
84 .seek(SeekFrom::Start(phys))
85 .and_then(|_| file.read_exact(&mut buf[..want]));
86 read_time += t.elapsed();
87 let failed = read.is_err();
88 if full_tx.send(read.map(|()| (i, buf))).is_err() || failed {
89 break;
90 }
91 }
92 read_time
93 });
94
95 let handle = match spawned {
96 Ok(h) => h,
97 Err(e) => {
98 tracing::warn!(error = %e, "scan I/O thread unavailable — inline sequential reads");
101 let mut file = open_raw_volume(volume_path)?;
102 let mut buf = vec![0u8; SCAN_CHUNK];
103 let mut read_time = Duration::ZERO;
104 for (i, c) in chunks.iter().enumerate() {
105 let t = Instant::now();
106 file.seek(SeekFrom::Start(c.phys))?;
107 file.read_exact(&mut buf[..c.want])?;
108 read_time += t.elapsed();
109 on_chunk(i, &mut buf[..c.want]);
110 }
111 return Ok((read_time, 1));
112 }
113 };
114
115 let mut result: Result<(), NtfsReaderError> = Ok(());
116 for _ in 0..chunks.len() {
117 match full_rx.recv() {
118 Ok(Ok((i, mut buf))) => {
119 on_chunk(i, &mut buf[..chunks[i].want]);
120 let _ = empty_tx.send(buf);
121 }
122 Ok(Err(e)) => {
123 result = Err(e.into());
124 break;
125 }
126 Err(_) => {
127 result = Err(std::io::Error::other("scan I/O thread terminated early").into());
128 break;
129 }
130 }
131 }
132 drop(full_rx);
134 drop(empty_tx);
135 let read_time = handle.join().unwrap_or(Duration::ZERO);
136 result.map(|()| (read_time, 0))
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142 use crate::index::testutil::TestDir;
143
144 #[test]
147 fn plan_chunks_is_record_aligned_and_ordered() {
148 let rs = 1024usize;
149 let map = RunMap {
152 runs: vec![
153 (0, 4096, 8 * 1024),
154 (16 * 1024, 0, SCAN_CHUNK as u64 + 4 * 1024),
155 ],
156 };
157 let data_size = 16 * 1024 + SCAN_CHUNK as u64 + 4 * 1024;
158 let chunks = plan_chunks(&map, data_size, rs);
159
160 assert!(!chunks.is_empty());
161 let mut prev_end = 0u64;
162 for c in &chunks {
163 assert_eq!(c.want % rs, 0, "chunk not record-aligned");
164 assert!(c.want <= SCAN_CHUNK);
165 assert!(c.logical >= prev_end, "chunks out of order");
166 prev_end = c.logical + c.want as u64;
167 }
168 let covered: usize = chunks.iter().map(|c| c.want).sum();
169 assert_eq!(covered as u64, data_size - 8 * 1024);
171 }
172
173 #[test]
177 fn pipeline_delivers_chunks_in_order_with_recycled_buffers() {
178 let rs = 512usize;
179 let dir = TestDir::new();
180 let path = dir.join("stream.bin");
181 let total = 16 * 1024usize;
183 let bytes: Vec<u8> = (0..total).map(|i| (i / 7 % 251) as u8).collect();
184 std::fs::write(&path, &bytes).unwrap();
185 let mut runs = Vec::new();
186 for i in 0..8u64 {
187 let phys = ((i + 3) % 8) * 2048; runs.push((i * 2048, phys, 2048));
189 }
190 let map = RunMap { runs };
191 let chunks = plan_chunks(&map, total as u64, rs);
192 assert_eq!(chunks.len(), 8, "one chunk per contiguous run");
193
194 let mut seen = Vec::new();
195 let (read_time, fallbacks) =
196 run_chunk_pipeline(path.to_str().unwrap(), &chunks, &mut |i, got| {
197 let c = &chunks[i];
198 assert_eq!(
199 got,
200 &bytes[c.phys as usize..c.phys as usize + c.want],
201 "chunk {i} bytes must come from its physical offset"
202 );
203 seen.push(i);
204 })
205 .expect("pipeline");
206 assert_eq!(seen, (0..8).collect::<Vec<_>>(), "strict chunk order");
207 assert_eq!(fallbacks, 0);
208 assert!(read_time <= std::time::Duration::from_secs(5));
209 }
210
211 #[test]
212 fn pipeline_propagates_read_errors() {
213 let dir = TestDir::new();
214 let path = dir.join("short.bin");
215 std::fs::write(&path, vec![0u8; 1024]).unwrap();
216 let chunks = vec![Chunk {
219 logical: 0,
220 phys: 0,
221 want: 4096,
222 }];
223 let mut called = 0;
224 let r = run_chunk_pipeline(path.to_str().unwrap(), &chunks, &mut |_, _| called += 1);
225 assert!(r.is_err());
226 assert_eq!(called, 0);
227 }
228}