@@ -93,6 +93,95 @@ impl FilesystemStore {
93
93
}
94
94
}
95
95
96
+ impl FilesystemStore {
97
+ fn write (
98
+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : & [ u8 ] ,
99
+ ) -> Result < ( ) , lightning:: io:: Error > {
100
+ check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "write" ) ?;
101
+
102
+ let mut dest_file_path = self . get_dest_dir_path ( primary_namespace, secondary_namespace) ?;
103
+ dest_file_path. push ( key) ;
104
+
105
+ let parent_directory = dest_file_path. parent ( ) . ok_or_else ( || {
106
+ let msg =
107
+ format ! ( "Could not retrieve parent directory of {}." , dest_file_path. display( ) ) ;
108
+ std:: io:: Error :: new ( std:: io:: ErrorKind :: InvalidInput , msg)
109
+ } ) ?;
110
+ fs:: create_dir_all ( & parent_directory) ?;
111
+
112
+ // Do a crazy dance with lots of fsync()s to be overly cautious here...
113
+ // We never want to end up in a state where we've lost the old data, or end up using the
114
+ // old data on power loss after we've returned.
115
+ // The way to atomically write a file on Unix platforms is:
116
+ // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
117
+ let mut tmp_file_path = dest_file_path. clone ( ) ;
118
+ let tmp_file_ext = format ! ( "{}.tmp" , self . tmp_file_counter. fetch_add( 1 , Ordering :: AcqRel ) ) ;
119
+ tmp_file_path. set_extension ( tmp_file_ext) ;
120
+
121
+ {
122
+ let mut tmp_file = fs:: File :: create ( & tmp_file_path) ?;
123
+ tmp_file. write_all ( & buf) ?;
124
+ tmp_file. sync_all ( ) ?;
125
+ }
126
+
127
+ let res = {
128
+ let inner_lock_ref = {
129
+ let mut outer_lock = self . locks . lock ( ) . unwrap ( ) ;
130
+ Arc :: clone ( & outer_lock. entry ( dest_file_path. clone ( ) ) . or_default ( ) )
131
+ } ;
132
+ let _guard = inner_lock_ref. write ( ) . unwrap ( ) ;
133
+
134
+ #[ cfg( not( target_os = "windows" ) ) ]
135
+ {
136
+ fs:: rename ( & tmp_file_path, & dest_file_path) ?;
137
+ let dir_file = fs:: OpenOptions :: new ( ) . read ( true ) . open ( & parent_directory) ?;
138
+ dir_file. sync_all ( ) ?;
139
+ Ok ( ( ) )
140
+ }
141
+
142
+ #[ cfg( target_os = "windows" ) ]
143
+ {
144
+ let res = if dest_file_path. exists ( ) {
145
+ call ! ( unsafe {
146
+ windows_sys:: Win32 :: Storage :: FileSystem :: ReplaceFileW (
147
+ path_to_windows_str( & dest_file_path) . as_ptr( ) ,
148
+ path_to_windows_str( & tmp_file_path) . as_ptr( ) ,
149
+ std:: ptr:: null( ) ,
150
+ windows_sys:: Win32 :: Storage :: FileSystem :: REPLACEFILE_IGNORE_MERGE_ERRORS ,
151
+ std:: ptr:: null_mut( ) as * const core:: ffi:: c_void,
152
+ std:: ptr:: null_mut( ) as * const core:: ffi:: c_void,
153
+ )
154
+ } )
155
+ } else {
156
+ call ! ( unsafe {
157
+ windows_sys:: Win32 :: Storage :: FileSystem :: MoveFileExW (
158
+ path_to_windows_str( & tmp_file_path) . as_ptr( ) ,
159
+ path_to_windows_str( & dest_file_path) . as_ptr( ) ,
160
+ windows_sys:: Win32 :: Storage :: FileSystem :: MOVEFILE_WRITE_THROUGH
161
+ | windows_sys:: Win32 :: Storage :: FileSystem :: MOVEFILE_REPLACE_EXISTING ,
162
+ )
163
+ } )
164
+ } ;
165
+
166
+ match res {
167
+ Ok ( ( ) ) => {
168
+ // We fsync the dest file in hopes this will also flush the metadata to disk.
169
+ let dest_file =
170
+ fs:: OpenOptions :: new ( ) . read ( true ) . write ( true ) . open ( & dest_file_path) ?;
171
+ dest_file. sync_all ( ) ?;
172
+ Ok ( ( ) )
173
+ } ,
174
+ Err ( e) => Err ( e. into ( ) ) ,
175
+ }
176
+ }
177
+ } ;
178
+
179
+ self . garbage_collect_locks ( ) ;
180
+
181
+ res
182
+ }
183
+ }
184
+
96
185
impl KVStore for FilesystemStore {
97
186
fn read (
98
187
& self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
@@ -122,99 +211,9 @@ impl KVStore for FilesystemStore {
122
211
fn write_async (
123
212
& self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : & [ u8 ] ,
124
213
) -> AsyncResultType < ' static , ( ) , lightning:: io:: Error > {
125
- Box :: pin ( async move {
126
- check_namespace_key_validity (
127
- primary_namespace,
128
- secondary_namespace,
129
- Some ( key) ,
130
- "write" ,
131
- ) ?;
132
-
133
- let mut dest_file_path =
134
- self . get_dest_dir_path ( primary_namespace, secondary_namespace) ?;
135
- dest_file_path. push ( key) ;
136
-
137
- let parent_directory = dest_file_path. parent ( ) . ok_or_else ( || {
138
- let msg =
139
- format ! ( "Could not retrieve parent directory of {}." , dest_file_path. display( ) ) ;
140
- std:: io:: Error :: new ( std:: io:: ErrorKind :: InvalidInput , msg)
141
- } ) ?;
142
- fs:: create_dir_all ( & parent_directory) ?;
143
-
144
- // Do a crazy dance with lots of fsync()s to be overly cautious here...
145
- // We never want to end up in a state where we've lost the old data, or end up using the
146
- // old data on power loss after we've returned.
147
- // The way to atomically write a file on Unix platforms is:
148
- // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
149
- let mut tmp_file_path = dest_file_path. clone ( ) ;
150
- let tmp_file_ext =
151
- format ! ( "{}.tmp" , self . tmp_file_counter. fetch_add( 1 , Ordering :: AcqRel ) ) ;
152
- tmp_file_path. set_extension ( tmp_file_ext) ;
153
-
154
- {
155
- let mut tmp_file = fs:: File :: create ( & tmp_file_path) ?;
156
- tmp_file. write_all ( & buf) ?;
157
- tmp_file. sync_all ( ) ?;
158
- }
159
-
160
- let res = {
161
- let inner_lock_ref = {
162
- let mut outer_lock = self . locks . lock ( ) . unwrap ( ) ;
163
- Arc :: clone ( & outer_lock. entry ( dest_file_path. clone ( ) ) . or_default ( ) )
164
- } ;
165
- let _guard = inner_lock_ref. write ( ) . unwrap ( ) ;
166
-
167
- #[ cfg( not( target_os = "windows" ) ) ]
168
- {
169
- fs:: rename ( & tmp_file_path, & dest_file_path) ?;
170
- let dir_file = fs:: OpenOptions :: new ( ) . read ( true ) . open ( & parent_directory) ?;
171
- dir_file. sync_all ( ) ?;
172
- Ok ( ( ) )
173
- }
174
-
175
- #[ cfg( target_os = "windows" ) ]
176
- {
177
- let res = if dest_file_path. exists ( ) {
178
- call ! ( unsafe {
179
- windows_sys:: Win32 :: Storage :: FileSystem :: ReplaceFileW (
180
- path_to_windows_str( & dest_file_path) . as_ptr( ) ,
181
- path_to_windows_str( & tmp_file_path) . as_ptr( ) ,
182
- std:: ptr:: null( ) ,
183
- windows_sys:: Win32 :: Storage :: FileSystem :: REPLACEFILE_IGNORE_MERGE_ERRORS ,
184
- std:: ptr:: null_mut( ) as * const core:: ffi:: c_void,
185
- std:: ptr:: null_mut( ) as * const core:: ffi:: c_void,
186
- )
187
- } )
188
- } else {
189
- call ! ( unsafe {
190
- windows_sys:: Win32 :: Storage :: FileSystem :: MoveFileExW (
191
- path_to_windows_str( & tmp_file_path) . as_ptr( ) ,
192
- path_to_windows_str( & dest_file_path) . as_ptr( ) ,
193
- windows_sys:: Win32 :: Storage :: FileSystem :: MOVEFILE_WRITE_THROUGH
194
- | windows_sys:: Win32 :: Storage :: FileSystem :: MOVEFILE_REPLACE_EXISTING ,
195
- )
196
- } )
197
- } ;
198
-
199
- match res {
200
- Ok ( ( ) ) => {
201
- // We fsync the dest file in hopes this will also flush the metadata to disk.
202
- let dest_file = fs:: OpenOptions :: new ( )
203
- . read ( true )
204
- . write ( true )
205
- . open ( & dest_file_path) ?;
206
- dest_file. sync_all ( ) ?;
207
- Ok ( ( ) )
208
- } ,
209
- Err ( e) => Err ( e. into ( ) ) ,
210
- }
211
- }
212
- } ;
213
-
214
- self . garbage_collect_locks ( ) ;
214
+ let res = self . write ( primary_namespace, secondary_namespace, key, buf) ;
215
215
216
- res
217
- } )
216
+ Box :: pin ( async move { res } )
218
217
}
219
218
220
219
fn remove (
0 commit comments