10
10
use std:: {
11
11
fs,
12
12
path:: { Component , Path } ,
13
+ sync:: atomic:: AtomicUsize ,
13
14
} ;
14
15
15
16
use crossbeam_channel:: { never, select, unbounded, Receiver , Sender } ;
16
- use notify:: { Config , RecommendedWatcher , RecursiveMode , Watcher } ;
17
+ use notify:: { Config , EventKind , RecommendedWatcher , RecursiveMode , Watcher } ;
17
18
use paths:: { AbsPath , AbsPathBuf , Utf8PathBuf } ;
18
- use vfs:: loader;
19
+ use rayon:: iter:: { IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator } ;
20
+ use vfs:: loader:: { self , LoadingProgress } ;
19
21
use walkdir:: WalkDir ;
20
22
21
23
#[ derive( Debug ) ]
@@ -88,7 +90,7 @@ impl NotifyActor {
88
90
tracing:: debug!( ?event, "vfs-notify event" ) ;
89
91
match event {
90
92
Event :: Message ( msg) => match msg {
91
- Message :: Config ( config) => {
93
+ Message :: Config ( mut config) => {
92
94
self . watcher = None ;
93
95
if !config. watch . is_empty ( ) {
94
96
let ( watcher_sender, watcher_receiver) = unbounded ( ) ;
@@ -104,35 +106,61 @@ impl NotifyActor {
104
106
let config_version = config. version ;
105
107
106
108
let n_total = config. load . len ( ) ;
107
- self . send ( loader:: Message :: Progress {
109
+ self . watched_entries . clear ( ) ;
110
+
111
+ let send = |msg| ( self . sender ) ( msg) ;
112
+ send ( loader:: Message :: Progress {
108
113
n_total,
109
- n_done : None ,
114
+ n_done : LoadingProgress :: Started ,
110
115
config_version,
111
116
dir : None ,
112
117
} ) ;
113
118
114
- self . watched_entries . clear ( ) ;
115
-
116
- for ( i, entry) in config. load . into_iter ( ) . enumerate ( ) {
117
- let watch = config. watch . contains ( & i) ;
118
- if watch {
119
- self . watched_entries . push ( entry. clone ( ) ) ;
119
+ let ( entry_tx, entry_rx) = unbounded ( ) ;
120
+ let ( watch_tx, watch_rx) = unbounded ( ) ;
121
+ let processed = AtomicUsize :: new ( 0 ) ;
122
+ config. load . into_par_iter ( ) . enumerate ( ) . for_each ( move |( i, entry) | {
123
+ let do_watch = config. watch . contains ( & i) ;
124
+ if do_watch {
125
+ _ = entry_tx. send ( entry. clone ( ) ) ;
120
126
}
121
- let files =
122
- self . load_entry ( entry, watch, |file| loader:: Message :: Progress {
123
- n_total,
124
- n_done : Some ( i) ,
125
- dir : Some ( file) ,
126
- config_version,
127
- } ) ;
128
- self . send ( loader:: Message :: Loaded { files } ) ;
129
- self . send ( loader:: Message :: Progress {
127
+ let files = Self :: load_entry (
128
+ |f| _ = watch_tx. send ( f. to_owned ( ) ) ,
129
+ entry,
130
+ do_watch,
131
+ |file| {
132
+ send ( loader:: Message :: Progress {
133
+ n_total,
134
+ n_done : LoadingProgress :: Progress (
135
+ processed. load ( std:: sync:: atomic:: Ordering :: Relaxed ) ,
136
+ ) ,
137
+ dir : Some ( file) ,
138
+ config_version,
139
+ } )
140
+ } ,
141
+ ) ;
142
+ send ( loader:: Message :: Loaded { files } ) ;
143
+ send ( loader:: Message :: Progress {
130
144
n_total,
131
- n_done : Some ( i + 1 ) ,
145
+ n_done : LoadingProgress :: Progress (
146
+ processed. fetch_add ( 1 , std:: sync:: atomic:: Ordering :: AcqRel ) + 1 ,
147
+ ) ,
132
148
config_version,
133
149
dir : None ,
134
150
} ) ;
151
+ } ) ;
152
+ for path in watch_rx {
153
+ self . watch ( & path) ;
154
+ }
155
+ for entry in entry_rx {
156
+ self . watched_entries . push ( entry) ;
135
157
}
158
+ self . send ( loader:: Message :: Progress {
159
+ n_total,
160
+ n_done : LoadingProgress :: Finished ,
161
+ config_version,
162
+ dir : None ,
163
+ } ) ;
136
164
}
137
165
Message :: Invalidate ( path) => {
138
166
let contents = read ( path. as_path ( ) ) ;
@@ -142,60 +170,67 @@ impl NotifyActor {
142
170
} ,
143
171
Event :: NotifyEvent ( event) => {
144
172
if let Some ( event) = log_notify_error ( event) {
145
- let files = event
146
- . paths
147
- . into_iter ( )
148
- . filter_map ( |path| {
149
- Some (
150
- AbsPathBuf :: try_from ( Utf8PathBuf :: from_path_buf ( path) . ok ( ) ?)
173
+ if let EventKind :: Create ( _) | EventKind :: Modify ( _) | EventKind :: Remove ( _) =
174
+ event. kind
175
+ {
176
+ let files = event
177
+ . paths
178
+ . into_iter ( )
179
+ . filter_map ( |path| {
180
+ Some (
181
+ AbsPathBuf :: try_from (
182
+ Utf8PathBuf :: from_path_buf ( path) . ok ( ) ?,
183
+ )
151
184
. expect ( "path is absolute" ) ,
152
- )
153
- } )
154
- . filter_map ( |path| {
155
- let meta = fs:: metadata ( & path) . ok ( ) ?;
156
- if meta. file_type ( ) . is_dir ( )
157
- && self
185
+ )
186
+ } )
187
+ . filter_map ( |path| {
188
+ let meta = fs:: metadata ( & path) . ok ( ) ?;
189
+ if meta. file_type ( ) . is_dir ( )
190
+ && self
191
+ . watched_entries
192
+ . iter ( )
193
+ . any ( |entry| entry. contains_dir ( & path) )
194
+ {
195
+ self . watch ( path. as_ref ( ) ) ;
196
+ return None ;
197
+ }
198
+
199
+ if !meta. file_type ( ) . is_file ( ) {
200
+ return None ;
201
+ }
202
+ if !self
158
203
. watched_entries
159
204
. iter ( )
160
- . any ( |entry| entry. contains_dir ( & path) )
161
- {
162
- self . watch ( path) ;
163
- return None ;
164
- }
165
-
166
- if !meta. file_type ( ) . is_file ( ) {
167
- return None ;
168
- }
169
- if !self
170
- . watched_entries
171
- . iter ( )
172
- . any ( |entry| entry. contains_file ( & path) )
173
- {
174
- return None ;
175
- }
176
-
177
- let contents = read ( & path) ;
178
- Some ( ( path, contents) )
179
- } )
180
- . collect ( ) ;
181
- self . send ( loader:: Message :: Changed { files } ) ;
205
+ . any ( |entry| entry. contains_file ( & path) )
206
+ {
207
+ return None ;
208
+ }
209
+
210
+ let contents = read ( & path) ;
211
+ Some ( ( path, contents) )
212
+ } )
213
+ . collect ( ) ;
214
+ self . send ( loader:: Message :: Changed { files } ) ;
215
+ }
182
216
}
183
217
}
184
218
}
185
219
}
186
220
}
221
+
187
222
fn load_entry (
188
- & mut self ,
223
+ mut watch : impl FnMut ( & Path ) ,
189
224
entry : loader:: Entry ,
190
- watch : bool ,
191
- make_message : impl Fn ( AbsPathBuf ) -> loader :: Message ,
225
+ do_watch : bool ,
226
+ send_message : impl Fn ( AbsPathBuf ) ,
192
227
) -> Vec < ( AbsPathBuf , Option < Vec < u8 > > ) > {
193
228
match entry {
194
229
loader:: Entry :: Files ( files) => files
195
230
. into_iter ( )
196
231
. map ( |file| {
197
- if watch {
198
- self . watch ( file. clone ( ) ) ;
232
+ if do_watch {
233
+ watch ( file. as_ref ( ) ) ;
199
234
}
200
235
let contents = read ( file. as_path ( ) ) ;
201
236
( file, contents)
@@ -205,15 +240,15 @@ impl NotifyActor {
205
240
let mut res = Vec :: new ( ) ;
206
241
207
242
for root in & dirs. include {
208
- self . send ( make_message ( root. clone ( ) ) ) ;
243
+ send_message ( root. clone ( ) ) ;
209
244
let walkdir =
210
245
WalkDir :: new ( root) . follow_links ( true ) . into_iter ( ) . filter_entry ( |entry| {
211
246
if !entry. file_type ( ) . is_dir ( ) {
212
247
return true ;
213
248
}
214
249
let path = entry. path ( ) ;
215
250
216
- if path_is_parent_symlink ( path) {
251
+ if path_might_be_cyclic ( path) {
217
252
return false ;
218
253
}
219
254
@@ -230,10 +265,10 @@ impl NotifyActor {
230
265
)
231
266
. ok ( ) ?;
232
267
if depth < 2 && is_dir {
233
- self . send ( make_message ( abs_path. clone ( ) ) ) ;
268
+ send_message ( abs_path. clone ( ) ) ;
234
269
}
235
- if is_dir && watch {
236
- self . watch ( abs_path. clone ( ) ) ;
270
+ if is_dir && do_watch {
271
+ watch ( abs_path. as_ref ( ) ) ;
237
272
}
238
273
if !is_file {
239
274
return None ;
@@ -255,12 +290,13 @@ impl NotifyActor {
255
290
}
256
291
}
257
292
258
- fn watch ( & mut self , path : AbsPathBuf ) {
293
+ fn watch ( & mut self , path : & Path ) {
259
294
if let Some ( ( watcher, _) ) = & mut self . watcher {
260
- log_notify_error ( watcher. watch ( path. as_ref ( ) , RecursiveMode :: NonRecursive ) ) ;
295
+ log_notify_error ( watcher. watch ( path, RecursiveMode :: NonRecursive ) ) ;
261
296
}
262
297
}
263
- fn send ( & mut self , msg : loader:: Message ) {
298
+
299
+ fn send ( & self , msg : loader:: Message ) {
264
300
( self . sender ) ( msg) ;
265
301
}
266
302
}
@@ -279,7 +315,7 @@ fn log_notify_error<T>(res: notify::Result<T>) -> Option<T> {
279
315
/// heuristic is not sufficient to catch all symlink cycles (it's
280
316
/// possible to construct cycle using two or more symlinks), but it
281
317
/// catches common cases.
282
- fn path_is_parent_symlink ( path : & Path ) -> bool {
318
+ fn path_might_be_cyclic ( path : & Path ) -> bool {
283
319
let Ok ( destination) = std:: fs:: read_link ( path) else {
284
320
return false ;
285
321
} ;
0 commit comments