Skip to content

Commit 8941c9d

Browse files
authored
Test and improve Xapi periodic scheduler (#6155)
There was some issue in the current code, from structure corruption to thread safety. Add some test and fix discovered issues. More details on commit messages.
2 parents d8baca7 + 88dd4d9 commit 8941c9d

File tree

5 files changed

+163
-38
lines changed

5 files changed

+163
-38
lines changed

ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(library
22
(public_name xapi-stdext-threads)
33
(name xapi_stdext_threads)
4-
(modules :standard \ ipq scheduler threadext_test ipq_test)
4+
(modules :standard \ ipq scheduler threadext_test ipq_test scheduler_test)
55
(libraries
66
mtime
77
mtime.clock.os
@@ -22,8 +22,8 @@
2222
)
2323

2424
(tests
25-
(names threadext_test ipq_test)
25+
(names threadext_test ipq_test scheduler_test)
2626
(package xapi-stdext-threads)
27-
(modules threadext_test ipq_test)
27+
(modules threadext_test ipq_test scheduler_test)
2828
(libraries xapi_stdext_threads alcotest mtime.clock.os mtime fmt threads.posix xapi_stdext_threads_scheduler)
2929
)

ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml

Lines changed: 56 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ let delay = Delay.make ()
2727

2828
let queue_default = {func= (fun () -> ()); ty= OneShot; name= ""}
2929

30+
let (pending_event : t option ref) = ref None
31+
3032
let (queue : t Ipq.t) = Ipq.create 50 queue_default
3133

3234
let lock = Mutex.create ()
@@ -48,48 +50,68 @@ module Clock = struct
4850
Mtime.min_stamp
4951
end
5052

51-
let add_to_queue ?(signal = true) name ty start newfunc =
52-
with_lock lock (fun () ->
53-
let ( ++ ) = Clock.add_span in
54-
Ipq.add queue
55-
{
56-
Ipq.ev= {func= newfunc; ty; name}
57-
; Ipq.time= Mtime_clock.now () ++ start
58-
}
59-
) ;
60-
if signal then Delay.signal delay
53+
let add_to_queue name ty start newfunc =
54+
let ( ++ ) = Clock.add_span in
55+
let item =
56+
{Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start}
57+
in
58+
with_lock lock (fun () -> Ipq.add queue item) ;
59+
Delay.signal delay
6160

6261
let remove_from_queue name =
63-
let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in
64-
if index > -1 then
65-
Ipq.remove queue index
62+
with_lock lock @@ fun () ->
63+
match !pending_event with
64+
| Some ev when ev.name = name ->
65+
pending_event := None
66+
| Some _ | None ->
67+
let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in
68+
if index > -1 then
69+
Ipq.remove queue index
70+
71+
let add_periodic_pending () =
72+
with_lock lock @@ fun () ->
73+
match !pending_event with
74+
| Some ({ty= Periodic timer; _} as ev) ->
75+
let ( ++ ) = Clock.add_span in
76+
let item = {Ipq.ev; Ipq.time= Mtime_clock.now () ++ timer} in
77+
Ipq.add queue item ;
78+
pending_event := None
79+
| Some {ty= OneShot; _} ->
80+
pending_event := None
81+
| None ->
82+
()
6683

6784
let loop () =
6885
debug "%s started" __MODULE__ ;
6986
try
7087
while true do
71-
let empty = with_lock lock (fun () -> Ipq.is_empty queue) in
72-
if empty then
73-
Thread.delay 10.0
74-
(* Doesn't happen often - the queue isn't usually empty *)
75-
else
76-
let next = with_lock lock (fun () -> Ipq.maximum queue) in
77-
let now = Mtime_clock.now () in
78-
if next.Ipq.time < now then (
79-
let todo =
80-
(with_lock lock (fun () -> Ipq.pop_maximum queue)).Ipq.ev
81-
in
88+
let now = Mtime_clock.now () in
89+
let deadline, item =
90+
with_lock lock @@ fun () ->
91+
(* empty: wait till we get something *)
92+
if Ipq.is_empty queue then
93+
(Clock.add_span now 10.0, None)
94+
else
95+
let next = Ipq.maximum queue in
96+
if Mtime.is_later next.Ipq.time ~than:now then
97+
(* not expired: wait till time or interrupted *)
98+
(next.Ipq.time, None)
99+
else (
100+
(* remove expired item *)
101+
Ipq.pop_maximum queue |> ignore ;
102+
(* save periodic to be scheduled again *)
103+
if next.Ipq.ev.ty <> OneShot then pending_event := Some next.Ipq.ev ;
104+
(now, Some next.Ipq.ev)
105+
)
106+
in
107+
match item with
108+
| Some todo ->
82109
(try todo.func () with _ -> ()) ;
83-
match todo.ty with
84-
| OneShot ->
85-
()
86-
| Periodic timer ->
87-
add_to_queue ~signal:false todo.name todo.ty timer todo.func
88-
) else (* Sleep until next event. *)
110+
add_periodic_pending ()
111+
| None -> (
112+
(* Sleep until next event. *)
89113
let sleep =
90-
Mtime.(span next.Ipq.time now)
91-
|> Mtime.Span.(add ms)
92-
|> Clock.span_to_s
114+
Mtime.(span deadline now) |> Mtime.Span.(add ms) |> Clock.span_to_s
93115
in
94116
try ignore (Delay.wait delay sleep)
95117
with e ->
@@ -105,6 +127,7 @@ let loop () =
105127
normal delay. New events may be missed."
106128
detailed_msg ;
107129
Thread.delay sleep
130+
)
108131
done
109132
with _ ->
110133
error

ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ type func_ty =
1818
| OneShot (** Fire just once *)
1919
| Periodic of float (** Fire periodically with a given period in seconds *)
2020

21-
val add_to_queue :
22-
?signal:bool -> string -> func_ty -> float -> (unit -> unit) -> unit
21+
val add_to_queue : string -> func_ty -> float -> (unit -> unit) -> unit
2322
(** Start a new timer. *)
2423

2524
val remove_from_queue : string -> unit
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
(*
2+
* Copyright (C) 2024 Cloud Software Group
3+
*
4+
* This program is free software; you can redistribute it and/or modify
5+
* it under the terms of the GNU Lesser General Public License as published
6+
* by the Free Software Foundation; version 2.1 only. with the special
7+
* exception on linking described in file LICENSE.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Lesser General Public License for more details.
13+
*)
14+
15+
module Scheduler = Xapi_stdext_threads_scheduler.Scheduler
16+
17+
let started = Atomic.make false
18+
19+
let start_schedule () =
20+
if not (Atomic.exchange started true) then
21+
Thread.create Scheduler.loop () |> ignore
22+
23+
let send event data = Event.(send event data |> sync)
24+
25+
let receive event = Event.(receive event |> sync)
26+
27+
let elapsed_ms cnt =
28+
let elapsed_ns = Mtime_clock.count cnt |> Mtime.Span.to_uint64_ns in
29+
Int64.(div elapsed_ns 1000000L |> to_int)
30+
31+
let is_less = Alcotest.(testable (pp int)) Stdlib.( > )
32+
33+
let test_single () =
34+
let finished = Event.new_channel () in
35+
Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () ->
36+
send finished true
37+
) ;
38+
start_schedule () ;
39+
Alcotest.(check bool) "result" true (receive finished)
40+
41+
let test_remove_self () =
42+
let which = Event.new_channel () in
43+
Scheduler.add_to_queue "self" (Scheduler.Periodic 0.001) 0.001 (fun () ->
44+
(* this should remove the periodic scheduling *)
45+
Scheduler.remove_from_queue "self" ;
46+
(* add an operation to stop the test *)
47+
Scheduler.add_to_queue "stop" Scheduler.OneShot 0.1 (fun () ->
48+
send which "stop"
49+
) ;
50+
send which "self"
51+
) ;
52+
start_schedule () ;
53+
let cnt = Mtime_clock.counter () in
54+
Alcotest.(check string) "same event name" "self" (receive which) ;
55+
Alcotest.(check string) "same event name" "stop" (receive which) ;
56+
let elapsed_ms = elapsed_ms cnt in
57+
Alcotest.check is_less "small time" 300 elapsed_ms
58+
59+
let test_empty () =
60+
let finished = Event.new_channel () in
61+
Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () ->
62+
send finished true
63+
) ;
64+
start_schedule () ;
65+
Alcotest.(check bool) "finished" true (receive finished) ;
66+
(* wait loop to go to wait with no work to do *)
67+
Thread.delay 0.1 ;
68+
Scheduler.add_to_queue "two" Scheduler.OneShot 0.001 (fun () ->
69+
send finished true
70+
) ;
71+
let cnt = Mtime_clock.counter () in
72+
Alcotest.(check bool) "finished" true (receive finished) ;
73+
let elapsed_ms = elapsed_ms cnt in
74+
Alcotest.check is_less "small time" 100 elapsed_ms
75+
76+
let test_wakeup () =
77+
let which = Event.new_channel () in
78+
(* schedule a long event *)
79+
Scheduler.add_to_queue "long" Scheduler.OneShot 2.0 (fun () ->
80+
send which "long"
81+
) ;
82+
start_schedule () ;
83+
(* wait loop to go to wait with no work to do *)
84+
Thread.delay 0.1 ;
85+
let cnt = Mtime_clock.counter () in
86+
(* schedule a quick event, should wake up the loop *)
87+
Scheduler.add_to_queue "quick" Scheduler.OneShot 0.1 (fun () ->
88+
send which "quick"
89+
) ;
90+
Alcotest.(check string) "same event name" "quick" (receive which) ;
91+
Scheduler.remove_from_queue "long" ;
92+
let elapsed_ms = elapsed_ms cnt in
93+
Alcotest.check is_less "small time" 150 elapsed_ms
94+
95+
let tests =
96+
[
97+
("test_single", `Quick, test_single)
98+
; ("test_remove_self", `Quick, test_remove_self)
99+
; ("test_empty", `Quick, test_empty)
100+
; ("test_wakeup", `Quick, test_wakeup)
101+
]
102+
103+
let () = Alcotest.run "Scheduler" [("generic", tests)]

ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.mli

Whitespace-only changes.

0 commit comments

Comments
 (0)