Skip to content

Commit eaeca75

Browse files
committed
Fix multiple issues in periodic scheduler
- Remove signal parameter from add_to_queue. The parameter was false only to support an internal usage, external users should always alert the thread loop; - Do not use ">" or other operators to compare Mtime.t, the value is intended to be unsigned and should be compared with Int64.unsigned_compare as Mtime functions do; - Limit mutex contention in add_to_queue; - Protect queue with mutex in remove_from_queue; - Do not wait huge amount of time if the queue is empty but use Delay.wait if possible; - Fix delete of periodic events. In case the event is processed it's removed from the queue. Previously remove_from_queue was not able to mark this event as removed; - Do not race between checking the first event and removing it. These 2 actions were done in 2 separate critical section, now they are done in a single one. Signed-off-by: Frediano Ziglio <[email protected]>
1 parent ea4dce4 commit eaeca75

File tree

2 files changed

+60
-35
lines changed

2 files changed

+60
-35
lines changed

ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml

Lines changed: 59 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ let delay = Delay.make ()
2727

2828
let queue_default = {func= (fun () -> ()); ty= OneShot; name= ""}
2929

30+
let (pending_event : t option ref) = ref None
31+
3032
let (queue : t Ipq.t) = Ipq.create 50 queue_default
3133

3234
let lock = Mutex.create ()
@@ -48,48 +50,71 @@ module Clock = struct
4850
Mtime.min_stamp
4951
end
5052

51-
let add_to_queue ?(signal = true) name ty start newfunc =
52-
with_lock lock (fun () ->
53-
let ( ++ ) = Clock.add_span in
54-
Ipq.add queue
55-
{
56-
Ipq.ev= {func= newfunc; ty; name}
57-
; Ipq.time= Mtime_clock.now () ++ start
58-
}
59-
) ;
60-
if signal then Delay.signal delay
53+
let add_to_queue name ty start newfunc =
54+
let ( ++ ) = Clock.add_span in
55+
let item =
56+
{Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start}
57+
in
58+
with_lock lock (fun () -> Ipq.add queue item) ;
59+
Delay.signal delay
6160

6261
let remove_from_queue name =
63-
let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in
64-
if index > -1 then
65-
Ipq.remove queue index
62+
with_lock lock @@ fun () ->
63+
match !pending_event with
64+
| Some ev when ev.name = name ->
65+
pending_event := None
66+
| Some _ | None ->
67+
let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in
68+
if index > -1 then
69+
Ipq.remove queue index
70+
71+
let add_periodic_pending () =
72+
with_lock lock @@ fun () ->
73+
match !pending_event with
74+
| Some ev ->
75+
( match ev.ty with
76+
| Periodic timer ->
77+
let ( ++ ) = Clock.add_span in
78+
let item = {Ipq.ev; Ipq.time= Mtime_clock.now () ++ timer} in
79+
Ipq.add queue item
80+
| OneShot ->
81+
()
82+
) ;
83+
pending_event := None
84+
| None ->
85+
()
6686

6787
let loop () =
6888
debug "%s started" __MODULE__ ;
6989
try
7090
while true do
71-
let empty = with_lock lock (fun () -> Ipq.is_empty queue) in
72-
if empty then
73-
Thread.delay 10.0
74-
(* Doesn't happen often - the queue isn't usually empty *)
75-
else
76-
let next = with_lock lock (fun () -> Ipq.maximum queue) in
77-
let now = Mtime_clock.now () in
78-
if next.Ipq.time < now then (
79-
let todo =
80-
(with_lock lock (fun () -> Ipq.pop_maximum queue)).Ipq.ev
81-
in
91+
let now = Mtime_clock.now () in
92+
let deadline, item =
93+
with_lock lock @@ fun () ->
94+
(* empty: wait till we get something *)
95+
if Ipq.is_empty queue then
96+
(Clock.add_span now 20.0, None)
97+
else
98+
let next = Ipq.maximum queue in
99+
if Mtime.is_later next.Ipq.time ~than:now then
100+
(* not expired: wait till time or interrupted *)
101+
(next.Ipq.time, None)
102+
else (
103+
(* remove expired item *)
104+
Ipq.pop_maximum queue |> ignore ;
105+
(* save periodic to be scheduled again *)
106+
if next.Ipq.ev.ty <> OneShot then pending_event := Some next.Ipq.ev ;
107+
(now, Some next.Ipq.ev)
108+
)
109+
in
110+
match item with
111+
| Some todo ->
82112
(try todo.func () with _ -> ()) ;
83-
match todo.ty with
84-
| OneShot ->
85-
()
86-
| Periodic timer ->
87-
add_to_queue ~signal:false todo.name todo.ty timer todo.func
88-
) else (* Sleep until next event. *)
113+
add_periodic_pending ()
114+
| None -> (
115+
(* Sleep until next event. *)
89116
let sleep =
90-
Mtime.(span next.Ipq.time now)
91-
|> Mtime.Span.(add ms)
92-
|> Clock.span_to_s
117+
Mtime.(span deadline now) |> Mtime.Span.(add ms) |> Clock.span_to_s
93118
in
94119
try ignore (Delay.wait delay sleep)
95120
with e ->
@@ -105,6 +130,7 @@ let loop () =
105130
normal delay. New events may be missed."
106131
detailed_msg ;
107132
Thread.delay sleep
133+
)
108134
done
109135
with _ ->
110136
error

ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ type func_ty =
1818
| OneShot (** Fire just once *)
1919
| Periodic of float (** Fire periodically with a given period in seconds *)
2020

21-
val add_to_queue :
22-
?signal:bool -> string -> func_ty -> float -> (unit -> unit) -> unit
21+
val add_to_queue : string -> func_ty -> float -> (unit -> unit) -> unit
2322
(** Start a new timer. *)
2423

2524
val remove_from_queue : string -> unit

0 commit comments

Comments
 (0)