@@ -6,6 +6,7 @@ use std::sync::Arc;
66use dashmap:: DashMap ;
77use libsql_sys:: ffi:: Sqlite3DbHeader ;
88use parking_lot:: { Condvar , Mutex } ;
9+ use rand:: Rng ;
910use tokio:: sync:: { mpsc, Notify , Semaphore } ;
1011use tokio:: task:: JoinSet ;
1112use zerocopy:: { AsBytes , FromZeroes } ;
@@ -32,7 +33,7 @@ enum Slot<IO: Io> {
3233
3334/// Wal Registry maintains a set of shared Wal, and their respective set of files.
3435pub struct WalRegistry < IO : Io , S > {
35- fs : IO ,
36+ io : IO ,
3637 path : PathBuf ,
3738 shutdown : AtomicBool ,
3839 opened : DashMap < NamespaceName , Slot < IO > > ,
@@ -59,7 +60,7 @@ impl<IO: Io, S> WalRegistry<IO, S> {
5960 ) -> Result < Self > {
6061 io. create_dir_all ( & path) ?;
6162 let registry = Self {
62- fs : io,
63+ io,
6364 path,
6465 opened : Default :: default ( ) ,
6566 shutdown : Default :: default ( ) ,
@@ -105,14 +106,15 @@ where
105106 . join ( shared. namespace ( ) . as_str ( ) )
106107 . join ( format ! ( "{}:{start_frame_no:020}.seg" , shared. namespace( ) ) ) ;
107108
108- let segment_file = self . fs . open ( true , true , true , & path) ?;
109-
109+ let segment_file = self . io . open ( true , true , true , & path) ?;
110+ let salt = self . io . with_rng ( |rng| rng . gen ( ) ) ;
110111 let new = CurrentSegment :: create (
111112 segment_file,
112113 path,
113114 start_frame_no,
114115 current. db_size ( ) ,
115116 current. tail ( ) . clone ( ) ,
117+ salt,
116118 ) ?;
117119 // sealing must the last fallible operation, because we don't want to end up in a situation
118120 // where the current log is sealed and it wasn't swapped.
@@ -126,9 +128,8 @@ where
126128 update_durable ( fno, notifier, durable_frame_no, namespace) . await ;
127129 } )
128130 } ) ;
129- self . storage
130- . store ( & shared. namespace , sealed. clone ( ) , None , cb) ;
131- new. tail ( ) . push ( sealed) ;
131+ new. tail ( ) . push ( sealed. clone ( ) ) ;
132+ self . storage . store ( & shared. namespace , sealed, None , cb) ;
132133 }
133134
134135 shared. current . swap ( Arc :: new ( new) ) ;
@@ -226,7 +227,7 @@ where
226227 db_path : & Path ,
227228 ) -> Result < Arc < SharedWal < IO > > > {
228229 let path = self . path . join ( namespace. as_str ( ) ) ;
229- self . fs . create_dir_all ( & path) ?;
230+ self . io . create_dir_all ( & path) ?;
230231 // TODO: handle that with abstract io
231232 let dir = walkdir:: WalkDir :: new ( & path) . sort_by_file_name ( ) . into_iter ( ) ;
232233
@@ -246,7 +247,7 @@ where
246247 continue ;
247248 }
248249
249- let file = self . fs . open ( false , true , true , entry. path ( ) ) ?;
250+ let file = self . io . open ( false , true , true , entry. path ( ) ) ?;
250251
251252 if let Some ( sealed) =
252253 SealedSegment :: open ( file. into ( ) , entry. path ( ) . to_path_buf ( ) , Default :: default ( ) ) ?
@@ -265,7 +266,7 @@ where
265266 }
266267 }
267268
268- let db_file = self . fs . open ( false , true , true , db_path) ?;
269+ let db_file = self . io . open ( false , true , true , db_path) ?;
269270
270271 let mut header: Sqlite3DbHeader = Sqlite3DbHeader :: new_zeroed ( ) ;
271272 db_file. read_exact_at ( header. as_bytes_mut ( ) , 0 ) ?;
@@ -283,14 +284,16 @@ where
283284
284285 let current_path = path. join ( format ! ( "{namespace}:{next_frame_no:020}.seg" ) ) ;
285286
286- let segment_file = self . fs . open ( true , true , true , & current_path) ?;
287+ let segment_file = self . io . open ( true , true , true , & current_path) ?;
288+ let salt = self . io . with_rng ( |rng| rng. gen ( ) ) ;
287289
288290 let current = arc_swap:: ArcSwap :: new ( Arc :: new ( CurrentSegment :: create (
289291 segment_file,
290292 current_path,
291293 next_frame_no,
292294 db_size,
293295 tail. into ( ) ,
296+ salt,
294297 ) ?) ) ;
295298
296299 let ( new_frame_notifier, _) = tokio:: sync:: watch:: channel ( next_frame_no. get ( ) - 1 ) ;
@@ -309,6 +312,7 @@ where
309312 namespace. clone ( ) ,
310313 ) ) ,
311314 shutdown : false . into ( ) ,
315+ checkpoint_notifier : self . checkpoint_notifier . clone ( ) ,
312316 } ) ;
313317
314318 self . opened
0 commit comments