fmf_core/metrics.rs
1//! Engine observability: per-operation traces, ring buffers of recent
2//! activity, and log2-bucket latency histograms.
3//!
4//! Everything here is cheap enough to stay on in production — an `Instant`
5//! pair and a few integer adds per operation (no allocation on the hot path
6//! beyond the trace struct itself).
7
8use std::collections::VecDeque;
9
10use parking_lot::Mutex;
11use serde::Serialize;
12
13/// Stage breakdown of one query, in microseconds.
14#[derive(Clone, Debug, Default, Serialize)]
15pub struct QueryTrace {
16 /// The raw query text this trace measured.
17 pub query: String,
18 /// Which execution strategy drove candidate generation (visualized in
19 /// the perf panel): e.g. "full-scan", "pool-scan", "suffix", "perm-walk".
20 pub driver: String,
21 /// Per-volume query-cache outcome: "miss", "refine" (all volumes
22 /// narrowed the previous result) or "partial" (mixed).
23 pub cache: String,
24 /// True when this query is identical (text + options) to the previous
25 /// one on every volume *and* produced identical id lists — the UI keeps
26 /// the displayed result instead of re-publishing (no repaint churn from
27 /// idle USN traffic).
28 pub unchanged: bool,
29 /// Query parse time, in microseconds.
30 pub parse_us: u64,
31 /// Query compile time, in microseconds.
32 pub compile_us: u64,
33 /// Dir-path memo (only path queries; 0 when cached/warm).
34 pub memo_us: u64,
35 /// Candidate-generation scan time, in microseconds.
36 pub scan_us: u64,
37 /// Result-row materialization time, in microseconds.
38 pub materialize_us: u64,
39 /// Multi-volume k-way merge.
40 pub merge_us: u64,
41 /// End-to-end query time, in microseconds.
42 pub total_us: u64,
43 /// Number of index entries examined during scanning.
44 pub entries_scanned: u64,
45 /// Number of entries skipped by exclusion rules.
46 pub excluded_skipped: u64,
47 /// Number of matching entries returned.
48 pub hits: u64,
49 /// Number of volumes queried.
50 pub volumes: u32,
51}
52
53/// One index-established event of a volume: an initial scan/rescan ("scan")
54/// or a snapshot restore ("snapshot"). Sharing one timeline keeps the ≤2s
55/// restore gate visible next to full-scan costs.
56#[derive(Clone, Debug, Default, Serialize)]
57pub struct ScanTrace {
58 /// The volume this event established (e.g. "C:").
59 pub volume: String,
60 /// "scan" | "snapshot".
61 pub source: String,
62 /// Bytes read from the MFT / snapshot.
63 pub read_bytes: u64,
64 /// Raw read time, in milliseconds.
65 pub read_ms: u64,
66 /// Read throughput, in megabytes per second.
67 pub mb_per_s: f64,
68 /// MFT-record parse time, in milliseconds.
69 pub parse_ms: u64,
70 /// Deferred $`ATTRIBUTE_LIST` name resolution.
71 pub deferred_ms: u64,
72 /// Index-build time, in milliseconds.
73 pub build_ms: u64,
74 /// Sort time, in milliseconds.
75 pub sort_ms: u64,
76 /// End-to-end time to establish the index, in milliseconds.
77 pub total_ms: u64,
78 /// Number of index entries established.
79 pub entries: u64,
80 /// Peak process working set during the event, in bytes.
81 pub peak_ws_bytes: u64,
82}
83
84/// One applied USN batch.
85#[derive(Clone, Debug, Default, Serialize)]
86pub struct UsnTrace {
87 /// The volume this USN batch was applied to (e.g. "C:").
88 pub volume: String,
89 /// Number of USN records in the batch.
90 pub records: u64,
91 /// Number of entries inserted or updated.
92 pub upserted: u64,
93 /// Number of entries removed (tombstoned).
94 pub deleted: u64,
95 /// Number of entries whose size/mtime stat was refreshed.
96 pub stat_updated: u64,
97 /// Number of stat refreshes that failed.
98 pub stat_failures: u64,
99 /// Time to apply the batch to the index, in microseconds.
100 pub apply_us: u64,
101}
102
103/// Per-column memory accounting for one volume index.
104#[derive(Clone, Debug, Default, Serialize)]
105pub struct IndexStats {
106 /// The volume this index covers (e.g. "C:").
107 pub volume: String,
108 /// Total rows in the index (live + tombstones).
109 pub entries: u64,
110 /// Number of live (non-tombstoned) rows.
111 pub live_entries: u64,
112 /// Number of tombstoned (deleted) rows.
113 pub tombstones: u64,
114 /// Bytes held by the original-case name pool.
115 pub name_pool_bytes: u64,
116 /// Bytes held by the case-folded (lowercase) name pool.
117 pub lower_pool_bytes: u64,
118 /// Bytes held by the name-pool offset table.
119 pub offsets_bytes: u64,
120 /// Bytes held by the parent-pointer column.
121 pub parent_bytes: u64,
122 /// Bytes held by the file-size column.
123 pub size_bytes: u64,
124 /// Bytes held by the modification-time column.
125 pub mtime_bytes: u64,
126 /// Bytes held by the File Reference Number column.
127 pub frn_bytes: u64,
128 /// Bytes held by the per-entry flag column.
129 pub flag_bytes: u64,
130 /// Bytes held by the sort permutations.
131 pub permutations_bytes: u64,
132 /// Bytes held by the FRN-to-row lookup map.
133 pub frn_map_bytes: u64,
134 /// Abandoned name bytes across both pools (tombstoned rows, in-place
135 /// dir renames: the folded copy always, the original copy when one
136 /// existed) — the reclaimable garbage. Compaction-trigger input; a
137 /// lower bound right after a snapshot restore.
138 pub dead_name_bytes: u64,
139 /// `dead_name_bytes / (name_pool + lower_pool)`.
140 pub pool_garbage_ratio: f64,
141 /// Generation-cached query accelerators (offset table, dir-path memo).
142 /// Part of the bytes/entry gate — they live in the engine process.
143 pub derived_cache_bytes: u64,
144 /// Total resident bytes for this index (sum of all columns + caches).
145 pub total_bytes: u64,
146 /// `total_bytes / entries` — the bytes/entry gate metric.
147 pub bytes_per_entry: f64,
148 /// Content generation counter (bumps on name/data changes).
149 pub content_generation: u64,
150 /// Structural generation counter (bumps on add/remove/rename).
151 pub structural_generation: u64,
152}
153
154impl IndexStats {
155 /// Fold the derived-cache footprint in (the index module cannot compute
156 /// it itself — the cached types belong to the query layer).
157 pub fn add_derived_bytes(&mut self, bytes: u64) {
158 self.derived_cache_bytes = bytes;
159 self.total_bytes += bytes;
160 self.bytes_per_entry = if self.entries > 0 {
161 self.total_bytes as f64 / self.entries as f64
162 } else {
163 0.0
164 };
165 }
166}
167
168/// Log2-bucketed microsecond histogram: bucket i counts values in
169/// [2^i, 2^(i+1)) µs. 32 buckets cover > an hour.
170#[derive(Clone, Debug, Default, Serialize)]
171pub struct Histogram {
172 /// Per-bucket counts; bucket i covers [2^i, 2^(i+1)) µs (length 32).
173 pub buckets: Vec<u64>, // length 32
174 /// Total number of recorded values.
175 pub count: u64,
176 /// Sum of all recorded values, in microseconds.
177 pub sum_us: u64,
178 /// Largest recorded value, in microseconds.
179 pub max_us: u64,
180}
181
182impl Histogram {
183 /// Create an empty histogram with 32 zeroed buckets.
184 #[must_use]
185 pub fn new() -> Self {
186 Self {
187 buckets: vec![0; 32],
188 ..Default::default()
189 }
190 }
191
192 /// Record a value (in microseconds) into its log2 bucket and update totals.
193 pub fn record(&mut self, us: u64) {
194 let b = (64 - us.max(1).leading_zeros() as usize - 1).min(31);
195 self.buckets[b] += 1;
196 self.count += 1;
197 self.sum_us += us;
198 self.max_us = self.max_us.max(us);
199 }
200
201 /// Approximate percentile (upper bound of the containing bucket).
202 #[must_use]
203 pub fn percentile_us(&self, p: f64) -> u64 {
204 if self.count == 0 {
205 return 0;
206 }
207 let target = ((self.count as f64) * p).ceil() as u64;
208 let mut seen = 0;
209 for (i, &c) in self.buckets.iter().enumerate() {
210 seen += c;
211 if seen >= target {
212 return 1u64 << (i + 1);
213 }
214 }
215 self.max_us
216 }
217}
218
219/// Degradation counters — "this happened N times" facts that would
220/// otherwise vanish into fallback paths. Zero-cost atomics, always on.
221#[derive(Debug, Default)]
222pub struct Counters {
223 /// Times a per-entry size/mtime stat fetch failed.
224 pub stat_fetch_failures: std::sync::atomic::AtomicU64,
225 /// Times a USN batch was truncated (records dropped before apply).
226 pub usn_batches_truncated: std::sync::atomic::AtomicU64,
227 /// Times a snapshot failed to load (fell back to a full scan).
228 pub snapshot_load_failures: std::sync::atomic::AtomicU64,
229 /// Times a snapshot failed to save.
230 pub snapshot_save_failures: std::sync::atomic::AtomicU64,
231 /// Times a deferred $`ATTRIBUTE_LIST` name could not be resolved.
232 pub deferred_names_unresolved: std::sync::atomic::AtomicU64,
233 /// Times a corrupt MFT record was encountered and skipped.
234 pub corrupt_mft_records: std::sync::atomic::AtomicU64,
235 /// Times the USN journal was rescanned from scratch (gap recovery).
236 pub journal_rescans: std::sync::atomic::AtomicU64,
237 /// Times the scan pipeline fell back to a slower path.
238 pub scan_pipeline_fallbacks: std::sync::atomic::AtomicU64,
239 /// A compacted copy was discarded because the index mutated under it —
240 /// impossible while the volume thread is the only writer; nonzero means
241 /// that invariant broke somewhere.
242 pub compaction_aborts: std::sync::atomic::AtomicU64,
243 /// Pipe server (fmf-service): a frame failed validation and the
244 /// connection was dropped.
245 pub pipe_malformed_frames: std::sync::atomic::AtomicU64,
246 /// Pipe server: a subscriber's bounded event queue overflowed and the
247 /// oldest event was dropped.
248 pub pipe_events_dropped: std::sync::atomic::AtomicU64,
249 /// Pipe server: a client was turned away at the instance cap.
250 pub pipe_connections_rejected: std::sync::atomic::AtomicU64,
251 /// Scan: the extension-record name cache hit its capacity; remaining
252 /// deferred names fall back to per-record disk reads.
253 pub deferred_name_cache_overflow: std::sync::atomic::AtomicU64,
254 /// Scan: a deferred-name disk read failed (the entry keeps a
255 /// placeholder name until the next rescan).
256 pub deferred_name_read_failures: std::sync::atomic::AtomicU64,
257 /// Pipe server: a result handle was LRU-evicted at the per-connection
258 /// cap; its next page fetch answers STALE("evicted").
259 pub pipe_results_evicted: std::sync::atomic::AtomicU64,
260 /// `QueryTrace` JSON serialization failed; the response carried an empty
261 /// trace (the query itself succeeded).
262 pub trace_serialize_failures: std::sync::atomic::AtomicU64,
263 /// Scope walk (ADR-0024): paths skipped because they could not be read
264 /// (permission denied or vanished mid-walk) — silent data loss otherwise.
265 pub walk_read_errors: std::sync::atomic::AtomicU64,
266 /// Scope walk: subtrees not descended because they hit the depth cap.
267 pub walk_depth_truncated: std::sync::atomic::AtomicU64,
268}
269
270/// Plain-integer, JSON-serializable copy of `Counters` for the FFI/UI.
271#[derive(Clone, Debug, Default, Serialize)]
272pub struct CountersSnapshot {
273 /// Times a per-entry size/mtime stat fetch failed.
274 pub stat_fetch_failures: u64,
275 /// Times a USN batch was truncated (records dropped before apply).
276 pub usn_batches_truncated: u64,
277 /// Times a snapshot failed to load (fell back to a full scan).
278 pub snapshot_load_failures: u64,
279 /// Times a snapshot failed to save.
280 pub snapshot_save_failures: u64,
281 /// Times a deferred $`ATTRIBUTE_LIST` name could not be resolved.
282 pub deferred_names_unresolved: u64,
283 /// Times a corrupt MFT record was encountered and skipped.
284 pub corrupt_mft_records: u64,
285 /// Times the USN journal was rescanned from scratch (gap recovery).
286 pub journal_rescans: u64,
287 /// Times the scan pipeline fell back to a slower path.
288 pub scan_pipeline_fallbacks: u64,
289 /// Times the query-layer offset table had to be rebuilt as a fallback.
290 pub offset_table_rebuild_fallbacks: u64,
291 /// Times a lazy permutation had to be rebuilt as a fallback.
292 pub lazy_perm_rebuild_fallbacks: u64,
293 /// A compacted copy was discarded because the index mutated under it —
294 /// impossible while the volume thread is the only writer; nonzero means
295 /// that invariant broke somewhere.
296 pub compaction_aborts: u64,
297 /// Pipe server (fmf-service): a frame failed validation and the
298 /// connection was dropped.
299 pub pipe_malformed_frames: u64,
300 /// Pipe server: a subscriber's bounded event queue overflowed and the
301 /// oldest event was dropped.
302 pub pipe_events_dropped: u64,
303 /// Pipe server: a client was turned away at the instance cap.
304 pub pipe_connections_rejected: u64,
305 /// Scan: the extension-record name cache hit its capacity; remaining
306 /// deferred names fall back to per-record disk reads.
307 pub deferred_name_cache_overflow: u64,
308 /// Scan: a deferred-name disk read failed (the entry keeps a
309 /// placeholder name until the next rescan).
310 pub deferred_name_read_failures: u64,
311 /// Pipe server: a result handle was LRU-evicted at the per-connection
312 /// cap; its next page fetch answers STALE("evicted").
313 pub pipe_results_evicted: u64,
314 /// `QueryTrace` JSON serialization failed; the response carried an empty
315 /// trace (the query itself succeeded).
316 pub trace_serialize_failures: u64,
317 /// Scope walk (ADR-0024): paths skipped because they could not be read.
318 pub walk_read_errors: u64,
319 /// Scope walk: subtrees not descended because they hit the depth cap.
320 pub walk_depth_truncated: u64,
321}
322
323/// The query layer has no `MetricsHub` handle (its degradations normally go
324/// through the diag ring — see memo.rs), so these counters are process
325/// globals folded into every snapshot.
326static OFFSET_TABLE_REBUILD_FALLBACKS: std::sync::atomic::AtomicU64 =
327 std::sync::atomic::AtomicU64::new(0);
328static LAZY_PERM_REBUILD_FALLBACKS: std::sync::atomic::AtomicU64 =
329 std::sync::atomic::AtomicU64::new(0);
330
331impl Counters {
332 /// Increment a counter by one (relaxed atomic).
333 pub fn bump(counter: &std::sync::atomic::AtomicU64) {
334 counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
335 }
336
337 pub(crate) fn bump_offset_table_rebuild_fallbacks() {
338 OFFSET_TABLE_REBUILD_FALLBACKS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
339 }
340
341 pub(crate) fn bump_lazy_perm_rebuild_fallbacks() {
342 LAZY_PERM_REBUILD_FALLBACKS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
343 }
344
345 /// Add `n` to a counter (relaxed atomic).
346 pub fn add(counter: &std::sync::atomic::AtomicU64, n: u64) {
347 counter.fetch_add(n, std::sync::atomic::Ordering::Relaxed);
348 }
349
350 /// Read all counters into a plain-integer `CountersSnapshot`.
351 pub fn snapshot(&self) -> CountersSnapshot {
352 use std::sync::atomic::Ordering::Relaxed;
353 CountersSnapshot {
354 stat_fetch_failures: self.stat_fetch_failures.load(Relaxed),
355 usn_batches_truncated: self.usn_batches_truncated.load(Relaxed),
356 snapshot_load_failures: self.snapshot_load_failures.load(Relaxed),
357 snapshot_save_failures: self.snapshot_save_failures.load(Relaxed),
358 deferred_names_unresolved: self.deferred_names_unresolved.load(Relaxed),
359 corrupt_mft_records: self.corrupt_mft_records.load(Relaxed),
360 journal_rescans: self.journal_rescans.load(Relaxed),
361 scan_pipeline_fallbacks: self.scan_pipeline_fallbacks.load(Relaxed),
362 offset_table_rebuild_fallbacks: OFFSET_TABLE_REBUILD_FALLBACKS.load(Relaxed),
363 lazy_perm_rebuild_fallbacks: LAZY_PERM_REBUILD_FALLBACKS.load(Relaxed),
364 compaction_aborts: self.compaction_aborts.load(Relaxed),
365 pipe_malformed_frames: self.pipe_malformed_frames.load(Relaxed),
366 pipe_events_dropped: self.pipe_events_dropped.load(Relaxed),
367 pipe_connections_rejected: self.pipe_connections_rejected.load(Relaxed),
368 deferred_name_cache_overflow: self.deferred_name_cache_overflow.load(Relaxed),
369 deferred_name_read_failures: self.deferred_name_read_failures.load(Relaxed),
370 pipe_results_evicted: self.pipe_results_evicted.load(Relaxed),
371 trace_serialize_failures: self.trace_serialize_failures.load(Relaxed),
372 walk_read_errors: self.walk_read_errors.load(Relaxed),
373 walk_depth_truncated: self.walk_depth_truncated.load(Relaxed),
374 }
375 }
376}
377
378/// Aggregated, JSON-serializable snapshot for the FFI/UI.
379#[derive(Clone, Debug, Default, Serialize)]
380pub struct MetricsSnapshot {
381 /// Most recent query traces, newest last.
382 pub recent_queries: Vec<QueryTrace>,
383 /// Latency histogram across all recorded queries.
384 pub query_histogram: Histogram,
385 /// 50th-percentile query latency, in microseconds.
386 pub p50_us: u64,
387 /// 99th-percentile query latency, in microseconds.
388 pub p99_us: u64,
389 /// Most recent applied USN batches.
390 pub recent_usn: Vec<UsnTrace>,
391 /// Most recent index-established (scan/snapshot) events.
392 pub scans: Vec<ScanTrace>,
393 /// Per-volume memory accounting for each live index.
394 pub indexes: Vec<IndexStats>,
395 /// Process-wide degradation counters.
396 pub counters: CountersSnapshot,
397 /// WARN+ events and panics (diag ring), oldest first.
398 pub recent_errors: Vec<crate::diag::ErrorEvent>,
399}
400
401const RING: usize = 256;
402const USN_RING: usize = 64;
403const SCAN_RING: usize = 64;
404
405/// Thread-safe metrics collector owned by the engine.
406#[derive(Default)]
407pub struct MetricsHub {
408 queries: Mutex<VecDeque<QueryTrace>>,
409 histogram: Mutex<Histogram>,
410 usn: Mutex<VecDeque<UsnTrace>>,
411 scans: Mutex<VecDeque<ScanTrace>>,
412 /// Process-wide degradation counters.
413 pub counters: Counters,
414}
415
416impl MetricsHub {
417 /// Create an empty hub with a fresh 32-bucket histogram.
418 #[must_use]
419 pub fn new() -> Self {
420 Self {
421 histogram: Mutex::new(Histogram::new()),
422 ..Default::default()
423 }
424 }
425
426 /// Record a query trace into the ring and its latency into the histogram.
427 pub fn record_query(&self, trace: QueryTrace) {
428 self.histogram.lock().record(trace.total_us);
429 let mut q = self.queries.lock();
430 if q.len() == RING {
431 q.pop_front();
432 }
433 q.push_back(trace);
434 }
435
436 /// Record a USN-batch trace into the ring.
437 pub fn record_usn(&self, trace: UsnTrace) {
438 let mut u = self.usn.lock();
439 if u.len() == USN_RING {
440 u.pop_front();
441 }
442 u.push_back(trace);
443 }
444
445 /// Record an index-established (scan/snapshot) trace into the ring.
446 pub fn record_scan(&self, trace: ScanTrace) {
447 let mut s = self.scans.lock();
448 if s.len() == SCAN_RING {
449 s.pop_front();
450 }
451 s.push_back(trace);
452 }
453
454 /// The most recently recorded query trace, if any.
455 pub fn last_query(&self) -> Option<QueryTrace> {
456 self.queries.lock().back().cloned()
457 }
458
459 /// Snapshot with the most recent `recent` queries (newest last).
460 pub fn snapshot(&self, recent: usize, indexes: Vec<IndexStats>) -> MetricsSnapshot {
461 let hist = self.histogram.lock().clone();
462 MetricsSnapshot {
463 recent_queries: {
464 let q = self.queries.lock();
465 q.iter().rev().take(recent).rev().cloned().collect()
466 },
467 p50_us: hist.percentile_us(0.50),
468 p99_us: hist.percentile_us(0.99),
469 query_histogram: hist,
470 recent_usn: self.usn.lock().iter().cloned().collect(),
471 scans: self.scans.lock().iter().cloned().collect(),
472 indexes,
473 counters: self.counters.snapshot(),
474 recent_errors: crate::diag::recent_errors(),
475 }
476 }
477}
478
479/// Microsecond stopwatch for stage timing.
480pub struct Stage(std::time::Instant);
481
482impl Stage {
483 /// Start a stopwatch at the current instant.
484 #[must_use]
485 pub fn start() -> Self {
486 Self(std::time::Instant::now())
487 }
488
489 /// Elapsed µs and restart — chain stages with one clock.
490 pub fn lap(&mut self) -> u64 {
491 let us = self.0.elapsed().as_micros() as u64;
492 self.0 = std::time::Instant::now();
493 us
494 }
495
496 /// Elapsed time since start (or last lap), in microseconds.
497 #[must_use]
498 pub fn elapsed_us(&self) -> u64 {
499 self.0.elapsed().as_micros() as u64
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506
507 #[test]
508 fn histogram_buckets_and_percentiles() {
509 let mut h = Histogram::new();
510 for us in [1u64, 2, 3, 100, 1000, 10_000] {
511 h.record(us);
512 }
513 assert_eq!(h.count, 6);
514 assert_eq!(h.max_us, 10_000);
515 // p50 lands in a small bucket, p99 near the max bucket.
516 assert!(h.percentile_us(0.5) <= 8);
517 assert!(h.percentile_us(0.99) >= 8192);
518 }
519
520 #[test]
521 fn ring_buffer_caps() {
522 let hub = MetricsHub::new();
523 for i in 0..300 {
524 hub.record_query(QueryTrace {
525 total_us: i,
526 ..Default::default()
527 });
528 hub.record_scan(ScanTrace {
529 total_ms: i,
530 ..Default::default()
531 });
532 }
533 let snap = hub.snapshot(16, Vec::new());
534 assert_eq!(snap.query_histogram.count, 300);
535 assert_eq!(snap.recent_queries.len(), 16);
536 // Newest last.
537 assert_eq!(snap.recent_queries.last().unwrap().total_us, 299);
538 // Scans are a ring too — a long-lived process must not grow it.
539 assert_eq!(snap.scans.len(), SCAN_RING);
540 assert_eq!(snap.scans.last().unwrap().total_ms, 299);
541 }
542}