Skip to content

Commit 3c8d4e0

Browse files
committed
std: file system watching for linux
1 parent a870228 commit 3c8d4e0

File tree

6 files changed

+368
-35
lines changed

6 files changed

+368
-35
lines changed

std/event/fs.zig

Lines changed: 206 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,18 @@ pub const Request = struct {
2020
PWriteV: PWriteV,
2121
PReadV: PReadV,
2222
OpenRead: OpenRead,
23+
OpenRW: OpenRW,
2324
Close: Close,
2425
WriteFile: WriteFile,
2526
End, // special - means the fs thread should exit
2627

2728
pub const PWriteV = struct {
2829
fd: os.FileHandle,
29-
data: []const []const u8,
30+
iov: []os.linux.iovec_const,
3031
offset: usize,
3132
result: Error!void,
3233

33-
pub const Error = error{};
34+
pub const Error = os.File.WriteError;
3435
};
3536

3637
pub const PReadV = struct {
@@ -50,6 +51,15 @@ pub const Request = struct {
5051
pub const Error = os.File.OpenError;
5152
};
5253

54+
pub const OpenRW = struct {
55+
/// must be null terminated. TODO https://github.com/ziglang/zig/issues/265
56+
path: []const u8,
57+
result: Error!os.FileHandle,
58+
mode: os.File.Mode,
59+
60+
pub const Error = os.File.OpenError;
61+
};
62+
5363
pub const WriteFile = struct {
5464
/// must be null terminated. TODO https://github.com/ziglang/zig/issues/265
5565
path: []const u8,
@@ -66,7 +76,7 @@ pub const Request = struct {
6676
};
6777
};
6878

69-
/// data - both the outer and inner references - must live until pwritev promise completes.
79+
/// data - just the inner references - must live until pwritev promise completes.
7080
pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: []const []const u8) !void {
7181
//const data_dupe = try mem.dupe(loop.allocator, []const u8, data);
7282
//defer loop.allocator.free(data_dupe);
@@ -78,13 +88,23 @@ pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, offset: usize, data:
7888
resume p;
7989
}
8090

91+
const iovecs = try loop.allocator.alloc(os.linux.iovec_const, data.len);
92+
defer loop.allocator.free(iovecs);
93+
94+
for (data) |buf, i| {
95+
iovecs[i] = os.linux.iovec_const{
96+
.iov_base = buf.ptr,
97+
.iov_len = buf.len,
98+
};
99+
}
100+
81101
var req_node = RequestNode{
82102
.next = undefined,
83103
.data = Request{
84104
.msg = Request.Msg{
85105
.PWriteV = Request.Msg.PWriteV{
86106
.fd = fd,
87-
.data = data,
107+
.iov = iovecs,
88108
.offset = offset,
89109
.result = undefined,
90110
},
@@ -162,12 +182,15 @@ pub async fn openRead(loop: *event.Loop, path: []const u8) os.File.OpenError!os.
162182
resume p;
163183
}
164184

185+
const path_with_null = try std.cstr.addNullByte(loop.allocator, path);
186+
defer loop.allocator.free(path_with_null);
187+
165188
var req_node = RequestNode{
166189
.next = undefined,
167190
.data = Request{
168191
.msg = Request.Msg{
169192
.OpenRead = Request.Msg.OpenRead{
170-
.path = path,
193+
.path = path_with_null[0..path.len],
171194
.result = undefined,
172195
},
173196
},
@@ -187,6 +210,48 @@ pub async fn openRead(loop: *event.Loop, path: []const u8) os.File.OpenError!os.
187210
return req_node.data.msg.OpenRead.result;
188211
}
189212

213+
/// Creates if does not exist. Does not truncate.
214+
pub async fn openReadWrite(
215+
loop: *event.Loop,
216+
path: []const u8,
217+
mode: os.File.Mode,
218+
) os.File.OpenError!os.FileHandle {
219+
// workaround for https://github.com/ziglang/zig/issues/1194
220+
var my_handle: promise = undefined;
221+
suspend |p| {
222+
my_handle = p;
223+
resume p;
224+
}
225+
226+
const path_with_null = try std.cstr.addNullByte(loop.allocator, path);
227+
defer loop.allocator.free(path_with_null);
228+
229+
var req_node = RequestNode{
230+
.next = undefined,
231+
.data = Request{
232+
.msg = Request.Msg{
233+
.OpenRW = Request.Msg.OpenRW{
234+
.path = path_with_null[0..path.len],
235+
.mode = mode,
236+
.result = undefined,
237+
},
238+
},
239+
.finish = Request.Finish{
240+
.TickNode = event.Loop.NextTickNode{
241+
.next = undefined,
242+
.data = my_handle,
243+
},
244+
},
245+
},
246+
};
247+
248+
suspend |_| {
249+
loop.linuxFsRequest(&req_node);
250+
}
251+
252+
return req_node.data.msg.OpenRW.result;
253+
}
254+
190255
/// This abstraction helps to close file handles in defer expressions
191256
/// without suspending. Start a CloseOperation before opening a file.
192257
pub const CloseOperation = struct {
@@ -302,6 +367,113 @@ pub async fn readFile(loop: *event.Loop, file_path: []const u8, max_size: usize)
302367
}
303368
}
304369

370+
pub const Watch = struct {
371+
channel: *event.Channel(Event),
372+
putter: promise,
373+
374+
pub const Event = union(enum) {
375+
CloseWrite,
376+
Err: Error,
377+
};
378+
379+
pub const Error = error{
380+
UserResourceLimitReached,
381+
SystemResources,
382+
};
383+
384+
pub fn destroy(self: *Watch) void {
385+
// TODO https://github.com/ziglang/zig/issues/1261
386+
cancel self.putter;
387+
}
388+
};
389+
390+
pub fn watchFile(loop: *event.Loop, file_path: []const u8) !*Watch {
391+
const path_with_null = try std.cstr.addNullByte(loop.allocator, file_path);
392+
defer loop.allocator.free(path_with_null);
393+
394+
const inotify_fd = try os.linuxINotifyInit1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC);
395+
errdefer os.close(inotify_fd);
396+
397+
const wd = try os.linuxINotifyAddWatchC(inotify_fd, path_with_null.ptr, os.linux.IN_CLOSE_WRITE);
398+
errdefer os.close(wd);
399+
400+
const channel = try event.Channel(Watch.Event).create(loop, 0);
401+
errdefer channel.destroy();
402+
403+
var result: *Watch = undefined;
404+
_ = try async<loop.allocator> watchEventPutter(inotify_fd, wd, channel, &result);
405+
return result;
406+
}
407+
408+
async fn watchEventPutter(inotify_fd: i32, wd: i32, channel: *event.Channel(Watch.Event), out_watch: **Watch) void {
409+
// TODO https://github.com/ziglang/zig/issues/1194
410+
var my_handle: promise = undefined;
411+
suspend |p| {
412+
my_handle = p;
413+
resume p;
414+
}
415+
416+
var watch = Watch{
417+
.putter = my_handle,
418+
.channel = channel,
419+
};
420+
out_watch.* = &watch;
421+
422+
const loop = channel.loop;
423+
loop.beginOneEvent();
424+
425+
defer {
426+
channel.destroy();
427+
os.close(wd);
428+
os.close(inotify_fd);
429+
loop.finishOneEvent();
430+
}
431+
432+
var event_buf: [4096]u8 align(@alignOf(os.linux.inotify_event)) = undefined;
433+
434+
while (true) {
435+
const rc = os.linux.read(inotify_fd, &event_buf, event_buf.len);
436+
const errno = os.linux.getErrno(rc);
437+
switch (errno) {
438+
0 => {
439+
// can't use @bytesToSlice because of the special variable length name field
440+
var ptr = event_buf[0..].ptr;
441+
const end_ptr = ptr + event_buf.len;
442+
var ev: *os.linux.inotify_event = undefined;
443+
while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += @sizeOf(os.linux.inotify_event) + ev.len) {
444+
ev = @ptrCast(*os.linux.inotify_event, ptr);
445+
if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) {
446+
await (async channel.put(Watch.Event.CloseWrite) catch unreachable);
447+
}
448+
}
449+
},
450+
os.linux.EINTR => continue,
451+
os.linux.EINVAL => unreachable,
452+
os.linux.EFAULT => unreachable,
453+
os.linux.EAGAIN => {
454+
(await (async loop.linuxWaitFd(
455+
inotify_fd,
456+
os.linux.EPOLLET | os.linux.EPOLLIN,
457+
) catch unreachable)) catch |err| {
458+
const transformed_err = switch (err) {
459+
error.InvalidFileDescriptor => unreachable,
460+
error.FileDescriptorAlreadyPresentInSet => unreachable,
461+
error.InvalidSyscall => unreachable,
462+
error.OperationCausesCircularLoop => unreachable,
463+
error.FileDescriptorNotRegistered => unreachable,
464+
error.SystemResources => error.SystemResources,
465+
error.UserResourceLimitReached => error.UserResourceLimitReached,
466+
error.FileDescriptorIncompatibleWithEpoll => unreachable,
467+
error.Unexpected => unreachable,
468+
};
469+
await (async channel.put(Watch.Event{ .Err = transformed_err }) catch unreachable);
470+
};
471+
},
472+
else => unreachable,
473+
}
474+
}
475+
}
476+
305477
const test_tmp_dir = "std_event_fs_test";
306478

307479
test "write a file, watch it, write it again" {
@@ -338,10 +510,39 @@ async fn testFsWatch(loop: *event.Loop) !void {
338510
\\line 1
339511
\\line 2
340512
;
513+
const line2_offset = 7;
341514

342515
// first just write then read the file
343516
try await try async writeFile(loop, file_path, contents);
344517

345518
const read_contents = try await try async readFile(loop, file_path, 1024 * 1024);
346519
assert(mem.eql(u8, read_contents, contents));
520+
521+
// now watch the file
522+
var watch = try watchFile(loop, file_path);
523+
defer watch.destroy();
524+
525+
const ev = try async watch.channel.get();
526+
var ev_consumed = false;
527+
defer if (!ev_consumed) cancel ev;
528+
529+
// overwrite line 2
530+
const fd = try await try async openReadWrite(loop, file_path, os.File.default_mode);
531+
{
532+
defer os.close(fd);
533+
534+
try await try async pwritev(loop, fd, line2_offset, []const []const u8{"lorem ipsum"});
535+
}
536+
537+
ev_consumed = true;
538+
switch (await ev) {
539+
Watch.Event.CloseWrite => {},
540+
Watch.Event.Err => |err| return err,
541+
}
542+
543+
const contents_updated = try await try async readFile(loop, file_path, 1024 * 1024);
544+
assert(mem.eql(u8, contents_updated,
545+
\\line 1
546+
\\lorem ipsum
547+
));
347548
}

0 commit comments

Comments
 (0)