1use rayon::prelude::*;
15
16use super::{EntryId, Frn, RecordNo, flags};
17
18#[derive(Default)]
19pub(super) struct FrnIndex {
20 ids: Vec<EntryId>,
23 covers: u32,
26}
27
28#[inline]
29fn is_live(flag: &[u8], id: EntryId) -> bool {
30 flag[id as usize] & flags::TOMBSTONE == 0
31}
32
33impl FrnIndex {
34 pub(super) fn build(frn: &[u64], flag: &[u8]) -> Self {
37 let mut pairs: Vec<(RecordNo, EntryId)> = (0..frn.len() as u32)
38 .filter(|&id| is_live(flag, id))
39 .map(|id| (Frn(frn[id as usize]).record(), id))
40 .collect();
41 pairs.par_sort_unstable();
42 Self {
43 ids: pairs.iter().map(|p| p.1).collect(),
44 covers: frn.len() as u32,
45 }
46 }
47
48 pub(super) fn lookup(&self, key: RecordNo, frn: &[u64], flag: &[u8]) -> Option<EntryId> {
50 for id in (self.covers..frn.len() as u32).rev() {
53 if Frn(frn[id as usize]).record() == key && is_live(flag, id) {
54 return Some(id);
55 }
56 }
57 let key_of = |id: EntryId| Frn(frn[id as usize]).record();
58 let start = self.ids.partition_point(|&id| key_of(id) < key);
59 for &id in &self.ids[start..] {
60 if key_of(id) != key {
61 break;
62 }
63 if is_live(flag, id) {
64 return Some(id);
65 }
66 }
67 None
68 }
69
70 pub(super) fn merge_appended(&mut self, frn: &[u64], flag: &[u8]) {
78 let n = frn.len() as u32;
79 let mut batch: Vec<(RecordNo, EntryId)> = (self.covers..n)
80 .filter(|&id| is_live(flag, id))
81 .map(|id| (Frn(frn[id as usize]).record(), id))
82 .collect();
83 self.covers = n;
84 if batch.is_empty() {
85 return;
86 }
87 batch.sort_unstable();
88
89 let old = self.ids.len();
90 super::reserve_bounded(&mut self.ids, batch.len());
91 self.ids.resize(old + batch.len(), 0);
92 let key_of = |id: EntryId| Frn(frn[id as usize]).record();
93 let mut hi = old; let mut k = old + batch.len(); for j in (0..batch.len()).rev() {
96 let (key, id) = batch[j];
97 let pos = self.ids[..hi].partition_point(|&oid| key_of(oid) <= key);
98 let seg = hi - pos;
99 self.ids.copy_within(pos..hi, k - seg);
100 k -= seg + 1;
101 self.ids[k] = id;
102 hi = pos;
103 }
104 debug_assert_eq!(k, hi, "merge cursors must close");
105 }
106
107 pub(super) fn compact(&self, remap: &[EntryId], new_len: u32) -> Self {
112 debug_assert_eq!(
113 self.covers as usize,
114 remap.len(),
115 "compact at a batch boundary only (unmerged tail would be lost)"
116 );
117 Self {
118 ids: self
119 .ids
120 .iter()
121 .filter_map(|&id| match remap[id as usize] {
122 super::NO_PARENT => None,
123 new_id => Some(new_id),
124 })
125 .collect(),
126 covers: new_len,
127 }
128 }
129
130 pub(super) const fn bytes(&self) -> u64 {
131 (self.ids.capacity() * 4) as u64
132 }
133
134 pub(super) fn shrink_to_fit(&mut self) {
135 self.ids.shrink_to_fit();
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::EntryId;
142 use crate::index::RecordNo;
143 use crate::index::testutil::{build_sample, raw, u16s};
144
145 fn forward_merge_reference(
148 keys: &mut Vec<RecordNo>,
149 ids: &mut Vec<EntryId>,
150 batch: &[(RecordNo, EntryId)],
151 ) {
152 let mut nk = Vec::with_capacity(keys.len() + batch.len());
153 let mut ni = Vec::with_capacity(ids.len() + batch.len());
154 let (mut i, mut j) = (0, 0);
155 while i < keys.len() && j < batch.len() {
156 if keys[i] <= batch[j].0 {
157 nk.push(keys[i]);
158 ni.push(ids[i]);
159 i += 1;
160 } else {
161 nk.push(batch[j].0);
162 ni.push(batch[j].1);
163 j += 1;
164 }
165 }
166 nk.extend_from_slice(&keys[i..]);
167 ni.extend_from_slice(&ids[i..]);
168 for &(k, id) in &batch[j..] {
169 nk.push(k);
170 ni.push(id);
171 }
172 *keys = nk;
173 *ids = ni;
174 }
175
176 #[test]
184 fn in_place_merge_is_byte_identical_to_the_forward_reference() {
185 let mut idx = build_sample();
186 let mut ref_ids = idx.frn_index.ids.clone();
187 let mut ref_keys: Vec<RecordNo> = ref_ids.iter().map(|&id| idx.frn(id).record()).collect();
188 let mut state = 0x1234_5678_9ABC_DEF0u64;
189 let mut rng = move || {
190 state ^= state << 13;
191 state ^= state >> 7;
192 state ^= state << 17;
193 state
194 };
195
196 for _ in 0..100 {
197 let first_new = idx.len() as u32;
198 for _ in 0..=(rng() % 8) {
199 let record = 100 + rng() % 40;
200 if rng() % 3 < 2 {
201 let name = u16s(&format!("f{}.txt", rng() % 1000));
202 idx.upsert(&raw(record, 50, &name, false, 1, 1));
203 } else {
204 idx.delete(record);
205 }
206 }
207 let mut batch: Vec<(RecordNo, EntryId)> = (first_new..idx.len() as u32)
209 .filter(|&id| idx.is_live(id))
210 .map(|id| (idx.frn(id).record(), id))
211 .collect();
212 batch.sort_unstable();
213 forward_merge_reference(&mut ref_keys, &mut ref_ids, &batch);
214
215 idx.merge_new_into_permutations(first_new);
216 assert_eq!(idx.frn_index.ids, ref_ids);
217 let derived_keys: Vec<RecordNo> = idx
218 .frn_index
219 .ids
220 .iter()
221 .map(|&id| idx.frn(id).record())
222 .collect();
223 assert_eq!(derived_keys, ref_keys);
224 }
225 }
226
227 #[test]
231 fn rename_storm_resolves_to_the_latest_before_and_after_merge() {
232 let mut idx = build_sample();
233 let live_before = idx.live_len();
234 let first_new = idx.len() as u32;
235 let mut last = None;
236 for i in 0..5u64 {
237 let name = u16s(&format!("storm_{i}.txt"));
238 last = Some(idx.upsert(&raw(100, 50, &name, false, 1, i as i64)));
239 }
240 let tail_hit = idx.entry_by_record(100).unwrap();
241 assert_eq!(Some(tail_hit), last, "tail scan must see the newest");
242 assert_eq!(idx.name(tail_hit), b"storm_4.txt");
243
244 idx.merge_new_into_permutations(first_new);
245 assert_eq!(idx.entry_by_record(100), last, "sorted lookup agrees");
246 assert_eq!(idx.live_len(), live_before, "storm nets zero live change");
247 }
248
249 #[test]
252 fn record_reuse_after_delete_resolves_to_the_new_entry() {
253 let mut idx = build_sample();
254 idx.delete(60).unwrap();
255 idx.merge_new_into_permutations(idx.len() as u32);
256 assert_eq!(idx.entry_by_record(60), None, "deleted record misses");
257
258 let first_new = idx.len() as u32;
259 let name = u16s("reborn.txt");
260 let id = idx.upsert(&raw(60, 50, &name, false, 7, 7));
261 assert_eq!(idx.entry_by_record(60), Some(id), "tail finds the rebirth");
262 idx.merge_new_into_permutations(first_new);
263 assert_eq!(idx.entry_by_record(60), Some(id), "merge keeps it");
264 assert_eq!(idx.name(id), b"reborn.txt");
265 }
266
267 #[test]
270 fn create_then_delete_within_a_batch_stays_gone() {
271 let mut idx = build_sample();
272 let first_new = idx.len() as u32;
273 let name = u16s("flash.tmp");
274 idx.upsert(&raw(777, 50, &name, false, 1, 1));
275 assert!(idx.entry_by_record(777).is_some());
276 idx.delete(777).unwrap();
277 assert_eq!(idx.entry_by_record(777), None, "gone in the tail");
278 idx.merge_new_into_permutations(first_new);
279 assert_eq!(idx.entry_by_record(777), None, "gone after the merge");
280 }
281}