Skip to main content

fmf_core/
metrics.rs

1//! Engine observability: per-operation traces, ring buffers of recent
2//! activity, and log2-bucket latency histograms.
3//!
4//! Everything here is cheap enough to stay on in production — an `Instant`
5//! pair and a few integer adds per operation (no allocation on the hot path
6//! beyond the trace struct itself).
7
8use std::collections::VecDeque;
9
10use parking_lot::Mutex;
11use serde::Serialize;
12
13/// Stage breakdown of one query, in microseconds.
14#[derive(Clone, Debug, Default, Serialize)]
15pub struct QueryTrace {
16    /// The raw query text this trace measured.
17    pub query: String,
18    /// Which execution strategy drove candidate generation (visualized in
19    /// the perf panel): e.g. "full-scan", "pool-scan", "suffix", "perm-walk".
20    pub driver: String,
21    /// Per-volume query-cache outcome: "miss", "refine" (all volumes
22    /// narrowed the previous result) or "partial" (mixed).
23    pub cache: String,
24    /// True when this query is identical (text + options) to the previous
25    /// one on every volume *and* produced identical id lists — the UI keeps
26    /// the displayed result instead of re-publishing (no repaint churn from
27    /// idle USN traffic).
28    pub unchanged: bool,
29    /// Query parse time, in microseconds.
30    pub parse_us: u64,
31    /// Query compile time, in microseconds.
32    pub compile_us: u64,
33    /// Dir-path memo (only path queries; 0 when cached/warm).
34    pub memo_us: u64,
35    /// Candidate-generation scan time, in microseconds.
36    pub scan_us: u64,
37    /// Result-row materialization time, in microseconds.
38    pub materialize_us: u64,
39    /// Multi-volume k-way merge.
40    pub merge_us: u64,
41    /// End-to-end query time, in microseconds.
42    pub total_us: u64,
43    /// Number of index entries examined during scanning.
44    pub entries_scanned: u64,
45    /// Number of entries skipped by exclusion rules.
46    pub excluded_skipped: u64,
47    /// Number of matching entries returned.
48    pub hits: u64,
49    /// Number of volumes queried.
50    pub volumes: u32,
51}
52
53/// One index-established event of a volume: an initial scan/rescan ("scan")
54/// or a snapshot restore ("snapshot"). Sharing one timeline keeps the ≤2s
55/// restore gate visible next to full-scan costs.
56#[derive(Clone, Debug, Default, Serialize)]
57pub struct ScanTrace {
58    /// The volume this event established (e.g. "C:").
59    pub volume: String,
60    /// "scan" | "snapshot".
61    pub source: String,
62    /// Bytes read from the MFT / snapshot.
63    pub read_bytes: u64,
64    /// Raw read time, in milliseconds.
65    pub read_ms: u64,
66    /// Read throughput, in megabytes per second.
67    pub mb_per_s: f64,
68    /// MFT-record parse time, in milliseconds.
69    pub parse_ms: u64,
70    /// Deferred $`ATTRIBUTE_LIST` name resolution.
71    pub deferred_ms: u64,
72    /// Index-build time, in milliseconds.
73    pub build_ms: u64,
74    /// Sort time, in milliseconds.
75    pub sort_ms: u64,
76    /// End-to-end time to establish the index, in milliseconds.
77    pub total_ms: u64,
78    /// Number of index entries established.
79    pub entries: u64,
80    /// Peak process working set during the event, in bytes.
81    pub peak_ws_bytes: u64,
82}
83
84/// One applied USN batch.
85#[derive(Clone, Debug, Default, Serialize)]
86pub struct UsnTrace {
87    /// The volume this USN batch was applied to (e.g. "C:").
88    pub volume: String,
89    /// Number of USN records in the batch.
90    pub records: u64,
91    /// Number of entries inserted or updated.
92    pub upserted: u64,
93    /// Number of entries removed (tombstoned).
94    pub deleted: u64,
95    /// Number of entries whose size/mtime stat was refreshed.
96    pub stat_updated: u64,
97    /// Number of stat refreshes that failed.
98    pub stat_failures: u64,
99    /// Time to apply the batch to the index, in microseconds.
100    pub apply_us: u64,
101}
102
103/// Per-column memory accounting for one volume index.
104#[derive(Clone, Debug, Default, Serialize)]
105pub struct IndexStats {
106    /// The volume this index covers (e.g. "C:").
107    pub volume: String,
108    /// Total rows in the index (live + tombstones).
109    pub entries: u64,
110    /// Number of live (non-tombstoned) rows.
111    pub live_entries: u64,
112    /// Number of tombstoned (deleted) rows.
113    pub tombstones: u64,
114    /// Bytes held by the original-case name pool.
115    pub name_pool_bytes: u64,
116    /// Bytes held by the case-folded (lowercase) name pool.
117    pub lower_pool_bytes: u64,
118    /// Bytes held by the name-pool offset table.
119    pub offsets_bytes: u64,
120    /// Bytes held by the parent-pointer column.
121    pub parent_bytes: u64,
122    /// Bytes held by the file-size column.
123    pub size_bytes: u64,
124    /// Bytes held by the modification-time column.
125    pub mtime_bytes: u64,
126    /// Bytes held by the File Reference Number column.
127    pub frn_bytes: u64,
128    /// Bytes held by the per-entry flag column.
129    pub flag_bytes: u64,
130    /// Bytes held by the sort permutations.
131    pub permutations_bytes: u64,
132    /// Bytes held by the FRN-to-row lookup map.
133    pub frn_map_bytes: u64,
134    /// Abandoned name bytes across both pools (tombstoned rows, in-place
135    /// dir renames: the folded copy always, the original copy when one
136    /// existed) — the reclaimable garbage. Compaction-trigger input; a
137    /// lower bound right after a snapshot restore.
138    pub dead_name_bytes: u64,
139    /// `dead_name_bytes / (name_pool + lower_pool)`.
140    pub pool_garbage_ratio: f64,
141    /// Generation-cached query accelerators (offset table, dir-path memo).
142    /// Part of the bytes/entry gate — they live in the engine process.
143    pub derived_cache_bytes: u64,
144    /// Total resident bytes for this index (sum of all columns + caches).
145    pub total_bytes: u64,
146    /// `total_bytes / entries` — the bytes/entry gate metric.
147    pub bytes_per_entry: f64,
148    /// Content generation counter (bumps on name/data changes).
149    pub content_generation: u64,
150    /// Structural generation counter (bumps on add/remove/rename).
151    pub structural_generation: u64,
152}
153
154impl IndexStats {
155    /// Fold the derived-cache footprint in (the index module cannot compute
156    /// it itself — the cached types belong to the query layer).
157    pub fn add_derived_bytes(&mut self, bytes: u64) {
158        self.derived_cache_bytes = bytes;
159        self.total_bytes += bytes;
160        self.bytes_per_entry = if self.entries > 0 {
161            self.total_bytes as f64 / self.entries as f64
162        } else {
163            0.0
164        };
165    }
166}
167
168/// Log2-bucketed microsecond histogram: bucket i counts values in
169/// [2^i, 2^(i+1)) µs. 32 buckets cover > an hour.
170#[derive(Clone, Debug, Default, Serialize)]
171pub struct Histogram {
172    /// Per-bucket counts; bucket i covers [2^i, 2^(i+1)) µs (length 32).
173    pub buckets: Vec<u64>, // length 32
174    /// Total number of recorded values.
175    pub count: u64,
176    /// Sum of all recorded values, in microseconds.
177    pub sum_us: u64,
178    /// Largest recorded value, in microseconds.
179    pub max_us: u64,
180}
181
182impl Histogram {
183    /// Create an empty histogram with 32 zeroed buckets.
184    #[must_use]
185    pub fn new() -> Self {
186        Self {
187            buckets: vec![0; 32],
188            ..Default::default()
189        }
190    }
191
192    /// Record a value (in microseconds) into its log2 bucket and update totals.
193    pub fn record(&mut self, us: u64) {
194        let b = (64 - us.max(1).leading_zeros() as usize - 1).min(31);
195        self.buckets[b] += 1;
196        self.count += 1;
197        self.sum_us += us;
198        self.max_us = self.max_us.max(us);
199    }
200
201    /// Approximate percentile (upper bound of the containing bucket).
202    #[must_use]
203    pub fn percentile_us(&self, p: f64) -> u64 {
204        if self.count == 0 {
205            return 0;
206        }
207        let target = ((self.count as f64) * p).ceil() as u64;
208        let mut seen = 0;
209        for (i, &c) in self.buckets.iter().enumerate() {
210            seen += c;
211            if seen >= target {
212                return 1u64 << (i + 1);
213            }
214        }
215        self.max_us
216    }
217}
218
219/// Degradation counters — "this happened N times" facts that would
220/// otherwise vanish into fallback paths. Zero-cost atomics, always on.
221#[derive(Debug, Default)]
222pub struct Counters {
223    /// Times a per-entry size/mtime stat fetch failed.
224    pub stat_fetch_failures: std::sync::atomic::AtomicU64,
225    /// Times a USN batch was truncated (records dropped before apply).
226    pub usn_batches_truncated: std::sync::atomic::AtomicU64,
227    /// Times a snapshot failed to load (fell back to a full scan).
228    pub snapshot_load_failures: std::sync::atomic::AtomicU64,
229    /// Times a snapshot failed to save.
230    pub snapshot_save_failures: std::sync::atomic::AtomicU64,
231    /// Times a deferred $`ATTRIBUTE_LIST` name could not be resolved.
232    pub deferred_names_unresolved: std::sync::atomic::AtomicU64,
233    /// Times a corrupt MFT record was encountered and skipped.
234    pub corrupt_mft_records: std::sync::atomic::AtomicU64,
235    /// Times the USN journal was rescanned from scratch (gap recovery).
236    pub journal_rescans: std::sync::atomic::AtomicU64,
237    /// Times the scan pipeline fell back to a slower path.
238    pub scan_pipeline_fallbacks: std::sync::atomic::AtomicU64,
239    /// A compacted copy was discarded because the index mutated under it —
240    /// impossible while the volume thread is the only writer; nonzero means
241    /// that invariant broke somewhere.
242    pub compaction_aborts: std::sync::atomic::AtomicU64,
243    /// Pipe server (fmf-service): a frame failed validation and the
244    /// connection was dropped.
245    pub pipe_malformed_frames: std::sync::atomic::AtomicU64,
246    /// Pipe server: a subscriber's bounded event queue overflowed and the
247    /// oldest event was dropped.
248    pub pipe_events_dropped: std::sync::atomic::AtomicU64,
249    /// Pipe server: a client was turned away at the instance cap.
250    pub pipe_connections_rejected: std::sync::atomic::AtomicU64,
251    /// Scan: the extension-record name cache hit its capacity; remaining
252    /// deferred names fall back to per-record disk reads.
253    pub deferred_name_cache_overflow: std::sync::atomic::AtomicU64,
254    /// Scan: a deferred-name disk read failed (the entry keeps a
255    /// placeholder name until the next rescan).
256    pub deferred_name_read_failures: std::sync::atomic::AtomicU64,
257    /// Pipe server: a result handle was LRU-evicted at the per-connection
258    /// cap; its next page fetch answers STALE("evicted").
259    pub pipe_results_evicted: std::sync::atomic::AtomicU64,
260    /// `QueryTrace` JSON serialization failed; the response carried an empty
261    /// trace (the query itself succeeded).
262    pub trace_serialize_failures: std::sync::atomic::AtomicU64,
263    /// Scope walk (ADR-0024): paths skipped because they could not be read
264    /// (permission denied or vanished mid-walk) — silent data loss otherwise.
265    pub walk_read_errors: std::sync::atomic::AtomicU64,
266    /// Scope walk: subtrees not descended because they hit the depth cap.
267    pub walk_depth_truncated: std::sync::atomic::AtomicU64,
268}
269
270/// Plain-integer, JSON-serializable copy of `Counters` for the FFI/UI.
271#[derive(Clone, Debug, Default, Serialize)]
272pub struct CountersSnapshot {
273    /// Times a per-entry size/mtime stat fetch failed.
274    pub stat_fetch_failures: u64,
275    /// Times a USN batch was truncated (records dropped before apply).
276    pub usn_batches_truncated: u64,
277    /// Times a snapshot failed to load (fell back to a full scan).
278    pub snapshot_load_failures: u64,
279    /// Times a snapshot failed to save.
280    pub snapshot_save_failures: u64,
281    /// Times a deferred $`ATTRIBUTE_LIST` name could not be resolved.
282    pub deferred_names_unresolved: u64,
283    /// Times a corrupt MFT record was encountered and skipped.
284    pub corrupt_mft_records: u64,
285    /// Times the USN journal was rescanned from scratch (gap recovery).
286    pub journal_rescans: u64,
287    /// Times the scan pipeline fell back to a slower path.
288    pub scan_pipeline_fallbacks: u64,
289    /// Times the query-layer offset table had to be rebuilt as a fallback.
290    pub offset_table_rebuild_fallbacks: u64,
291    /// Times a lazy permutation had to be rebuilt as a fallback.
292    pub lazy_perm_rebuild_fallbacks: u64,
293    /// A compacted copy was discarded because the index mutated under it —
294    /// impossible while the volume thread is the only writer; nonzero means
295    /// that invariant broke somewhere.
296    pub compaction_aborts: u64,
297    /// Pipe server (fmf-service): a frame failed validation and the
298    /// connection was dropped.
299    pub pipe_malformed_frames: u64,
300    /// Pipe server: a subscriber's bounded event queue overflowed and the
301    /// oldest event was dropped.
302    pub pipe_events_dropped: u64,
303    /// Pipe server: a client was turned away at the instance cap.
304    pub pipe_connections_rejected: u64,
305    /// Scan: the extension-record name cache hit its capacity; remaining
306    /// deferred names fall back to per-record disk reads.
307    pub deferred_name_cache_overflow: u64,
308    /// Scan: a deferred-name disk read failed (the entry keeps a
309    /// placeholder name until the next rescan).
310    pub deferred_name_read_failures: u64,
311    /// Pipe server: a result handle was LRU-evicted at the per-connection
312    /// cap; its next page fetch answers STALE("evicted").
313    pub pipe_results_evicted: u64,
314    /// `QueryTrace` JSON serialization failed; the response carried an empty
315    /// trace (the query itself succeeded).
316    pub trace_serialize_failures: u64,
317    /// Scope walk (ADR-0024): paths skipped because they could not be read.
318    pub walk_read_errors: u64,
319    /// Scope walk: subtrees not descended because they hit the depth cap.
320    pub walk_depth_truncated: u64,
321}
322
323/// The query layer has no `MetricsHub` handle (its degradations normally go
324/// through the diag ring — see memo.rs), so these counters are process
325/// globals folded into every snapshot.
326static OFFSET_TABLE_REBUILD_FALLBACKS: std::sync::atomic::AtomicU64 =
327    std::sync::atomic::AtomicU64::new(0);
328static LAZY_PERM_REBUILD_FALLBACKS: std::sync::atomic::AtomicU64 =
329    std::sync::atomic::AtomicU64::new(0);
330
331impl Counters {
332    /// Increment a counter by one (relaxed atomic).
333    pub fn bump(counter: &std::sync::atomic::AtomicU64) {
334        counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
335    }
336
337    pub(crate) fn bump_offset_table_rebuild_fallbacks() {
338        OFFSET_TABLE_REBUILD_FALLBACKS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
339    }
340
341    pub(crate) fn bump_lazy_perm_rebuild_fallbacks() {
342        LAZY_PERM_REBUILD_FALLBACKS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
343    }
344
345    /// Add `n` to a counter (relaxed atomic).
346    pub fn add(counter: &std::sync::atomic::AtomicU64, n: u64) {
347        counter.fetch_add(n, std::sync::atomic::Ordering::Relaxed);
348    }
349
350    /// Read all counters into a plain-integer `CountersSnapshot`.
351    pub fn snapshot(&self) -> CountersSnapshot {
352        use std::sync::atomic::Ordering::Relaxed;
353        CountersSnapshot {
354            stat_fetch_failures: self.stat_fetch_failures.load(Relaxed),
355            usn_batches_truncated: self.usn_batches_truncated.load(Relaxed),
356            snapshot_load_failures: self.snapshot_load_failures.load(Relaxed),
357            snapshot_save_failures: self.snapshot_save_failures.load(Relaxed),
358            deferred_names_unresolved: self.deferred_names_unresolved.load(Relaxed),
359            corrupt_mft_records: self.corrupt_mft_records.load(Relaxed),
360            journal_rescans: self.journal_rescans.load(Relaxed),
361            scan_pipeline_fallbacks: self.scan_pipeline_fallbacks.load(Relaxed),
362            offset_table_rebuild_fallbacks: OFFSET_TABLE_REBUILD_FALLBACKS.load(Relaxed),
363            lazy_perm_rebuild_fallbacks: LAZY_PERM_REBUILD_FALLBACKS.load(Relaxed),
364            compaction_aborts: self.compaction_aborts.load(Relaxed),
365            pipe_malformed_frames: self.pipe_malformed_frames.load(Relaxed),
366            pipe_events_dropped: self.pipe_events_dropped.load(Relaxed),
367            pipe_connections_rejected: self.pipe_connections_rejected.load(Relaxed),
368            deferred_name_cache_overflow: self.deferred_name_cache_overflow.load(Relaxed),
369            deferred_name_read_failures: self.deferred_name_read_failures.load(Relaxed),
370            pipe_results_evicted: self.pipe_results_evicted.load(Relaxed),
371            trace_serialize_failures: self.trace_serialize_failures.load(Relaxed),
372            walk_read_errors: self.walk_read_errors.load(Relaxed),
373            walk_depth_truncated: self.walk_depth_truncated.load(Relaxed),
374        }
375    }
376}
377
378/// Aggregated, JSON-serializable snapshot for the FFI/UI.
379#[derive(Clone, Debug, Default, Serialize)]
380pub struct MetricsSnapshot {
381    /// Most recent query traces, newest last.
382    pub recent_queries: Vec<QueryTrace>,
383    /// Latency histogram across all recorded queries.
384    pub query_histogram: Histogram,
385    /// 50th-percentile query latency, in microseconds.
386    pub p50_us: u64,
387    /// 99th-percentile query latency, in microseconds.
388    pub p99_us: u64,
389    /// Most recent applied USN batches.
390    pub recent_usn: Vec<UsnTrace>,
391    /// Most recent index-established (scan/snapshot) events.
392    pub scans: Vec<ScanTrace>,
393    /// Per-volume memory accounting for each live index.
394    pub indexes: Vec<IndexStats>,
395    /// Process-wide degradation counters.
396    pub counters: CountersSnapshot,
397    /// WARN+ events and panics (diag ring), oldest first.
398    pub recent_errors: Vec<crate::diag::ErrorEvent>,
399}
400
401const RING: usize = 256;
402const USN_RING: usize = 64;
403const SCAN_RING: usize = 64;
404
405/// Thread-safe metrics collector owned by the engine.
406#[derive(Default)]
407pub struct MetricsHub {
408    queries: Mutex<VecDeque<QueryTrace>>,
409    histogram: Mutex<Histogram>,
410    usn: Mutex<VecDeque<UsnTrace>>,
411    scans: Mutex<VecDeque<ScanTrace>>,
412    /// Process-wide degradation counters.
413    pub counters: Counters,
414}
415
416impl MetricsHub {
417    /// Create an empty hub with a fresh 32-bucket histogram.
418    #[must_use]
419    pub fn new() -> Self {
420        Self {
421            histogram: Mutex::new(Histogram::new()),
422            ..Default::default()
423        }
424    }
425
426    /// Record a query trace into the ring and its latency into the histogram.
427    pub fn record_query(&self, trace: QueryTrace) {
428        self.histogram.lock().record(trace.total_us);
429        let mut q = self.queries.lock();
430        if q.len() == RING {
431            q.pop_front();
432        }
433        q.push_back(trace);
434    }
435
436    /// Record a USN-batch trace into the ring.
437    pub fn record_usn(&self, trace: UsnTrace) {
438        let mut u = self.usn.lock();
439        if u.len() == USN_RING {
440            u.pop_front();
441        }
442        u.push_back(trace);
443    }
444
445    /// Record an index-established (scan/snapshot) trace into the ring.
446    pub fn record_scan(&self, trace: ScanTrace) {
447        let mut s = self.scans.lock();
448        if s.len() == SCAN_RING {
449            s.pop_front();
450        }
451        s.push_back(trace);
452    }
453
454    /// The most recently recorded query trace, if any.
455    pub fn last_query(&self) -> Option<QueryTrace> {
456        self.queries.lock().back().cloned()
457    }
458
459    /// Snapshot with the most recent `recent` queries (newest last).
460    pub fn snapshot(&self, recent: usize, indexes: Vec<IndexStats>) -> MetricsSnapshot {
461        let hist = self.histogram.lock().clone();
462        MetricsSnapshot {
463            recent_queries: {
464                let q = self.queries.lock();
465                q.iter().rev().take(recent).rev().cloned().collect()
466            },
467            p50_us: hist.percentile_us(0.50),
468            p99_us: hist.percentile_us(0.99),
469            query_histogram: hist,
470            recent_usn: self.usn.lock().iter().cloned().collect(),
471            scans: self.scans.lock().iter().cloned().collect(),
472            indexes,
473            counters: self.counters.snapshot(),
474            recent_errors: crate::diag::recent_errors(),
475        }
476    }
477}
478
479/// Microsecond stopwatch for stage timing.
480pub struct Stage(std::time::Instant);
481
482impl Stage {
483    /// Start a stopwatch at the current instant.
484    #[must_use]
485    pub fn start() -> Self {
486        Self(std::time::Instant::now())
487    }
488
489    /// Elapsed µs and restart — chain stages with one clock.
490    pub fn lap(&mut self) -> u64 {
491        let us = self.0.elapsed().as_micros() as u64;
492        self.0 = std::time::Instant::now();
493        us
494    }
495
496    /// Elapsed time since start (or last lap), in microseconds.
497    #[must_use]
498    pub fn elapsed_us(&self) -> u64 {
499        self.0.elapsed().as_micros() as u64
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506
507    #[test]
508    fn histogram_buckets_and_percentiles() {
509        let mut h = Histogram::new();
510        for us in [1u64, 2, 3, 100, 1000, 10_000] {
511            h.record(us);
512        }
513        assert_eq!(h.count, 6);
514        assert_eq!(h.max_us, 10_000);
515        // p50 lands in a small bucket, p99 near the max bucket.
516        assert!(h.percentile_us(0.5) <= 8);
517        assert!(h.percentile_us(0.99) >= 8192);
518    }
519
520    #[test]
521    fn ring_buffer_caps() {
522        let hub = MetricsHub::new();
523        for i in 0..300 {
524            hub.record_query(QueryTrace {
525                total_us: i,
526                ..Default::default()
527            });
528            hub.record_scan(ScanTrace {
529                total_ms: i,
530                ..Default::default()
531            });
532        }
533        let snap = hub.snapshot(16, Vec::new());
534        assert_eq!(snap.query_histogram.count, 300);
535        assert_eq!(snap.recent_queries.len(), 16);
536        // Newest last.
537        assert_eq!(snap.recent_queries.last().unwrap().total_us, 299);
538        // Scans are a ring too — a long-lived process must not grow it.
539        assert_eq!(snap.scans.len(), SCAN_RING);
540        assert_eq!(snap.scans.last().unwrap().total_ms, 299);
541    }
542}