Skip to content

Commit 5f1a59c

Browse files
authored
CP-51692: use Event.from instead of Event.next (#6125)
Event.next can lose events, and we try to encourage our API users to always use Event.from instead. My understanding (which could be wrong) is that Event.from will give you the events since a given point in time (token), whereas Event.next only gives you even from when you called the API (missing events between API calls). This PR introduces a feature flag that we can use to switch the interal users of Event.next to Event.from. Doing so I've discovered a bug in ocaml-rpc, where `int32_of_rpc` doesn't accept Int32 as input, just Int. There is a workaround here for now, but I intend to fix this upstream instead, hence the draft PR.
2 parents d2d21f1 + ace50ae commit 5f1a59c

File tree

8 files changed

+245
-97
lines changed

8 files changed

+245
-97
lines changed

ocaml/tests/bench/bench_throttle2.ml

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
open Bechamel
2+
3+
let () =
4+
Suite_init.harness_init () ;
5+
Debug.set_level Syslog.Warning
6+
7+
let __context, _ = Test_event_common.event_setup_common ()
8+
9+
let allocate_tasks n =
10+
( __context
11+
, Array.init n @@ fun i ->
12+
let label = Printf.sprintf "task %d" i in
13+
Xapi_task.create ~__context ~label ~description:"test task"
14+
)
15+
16+
let free_tasks (__context, tasks) =
17+
let () =
18+
tasks |> Array.iter @@ fun self -> Xapi_task.destroy ~__context ~self
19+
in
20+
()
21+
22+
let set_pending tasks =
23+
tasks
24+
|> Array.iter @@ fun self ->
25+
Xapi_task.set_status ~__context ~self ~value:`pending
26+
27+
let run_tasks _n (__context, tasks) =
28+
set_pending tasks ;
29+
let () =
30+
tasks
31+
|> Array.iter @@ fun self ->
32+
Xapi_task.set_status ~__context ~self ~value:`success
33+
in
34+
tasks |> Array.iter @@ fun t -> Helpers.Task.wait_for ~__context ~tasks:[t]
35+
36+
let run_tasks' _n (__context, tasks) =
37+
set_pending tasks ;
38+
let () =
39+
tasks
40+
|> Array.iter @@ fun self ->
41+
Xapi_task.set_status ~__context ~self ~value:`success
42+
in
43+
Helpers.Task.wait_for ~__context ~tasks:(Array.to_list tasks)
44+
45+
module D = Debug.Make (struct let name = __MODULE__ end)
46+
47+
let run_tasks'' n (__context, tasks) =
48+
set_pending tasks ;
49+
let finished = Atomic.make 0 in
50+
let (t : Thread.t) =
51+
Thread.create
52+
(fun () ->
53+
for _ = 1 to 10 do
54+
Thread.yield ()
55+
done ;
56+
tasks
57+
|> Array.iter @@ fun self ->
58+
Xapi_task.set_status ~__context ~self ~value:`success ;
59+
Atomic.incr finished
60+
)
61+
()
62+
in
63+
Helpers.Task.wait_for ~__context ~tasks:(Array.to_list tasks) ;
64+
let f = Atomic.get finished in
65+
assert (f = n || f = n - 1) ;
66+
Thread.join t
67+
68+
let benchmarks =
69+
Test.make_grouped ~name:"Task latency"
70+
[
71+
Test.make_indexed_with_resource ~name:"task complete+wait latency"
72+
~args:[1; 10; 100] Test.multiple ~allocate:allocate_tasks
73+
~free:free_tasks (fun n -> Staged.stage (run_tasks n)
74+
)
75+
; Test.make_indexed_with_resource ~name:"task complete+wait all latency"
76+
~args:[1; 10; 100] Test.multiple ~allocate:allocate_tasks
77+
~free:free_tasks (fun n -> Staged.stage (run_tasks' n)
78+
)
79+
; Test.make_indexed_with_resource
80+
~name:"task complete+wait all latency (thread)" ~args:[1; 10; 100]
81+
Test.multiple ~allocate:allocate_tasks ~free:free_tasks (fun n ->
82+
Staged.stage (run_tasks'' n)
83+
)
84+
]
85+
86+
let () = Bechamel_simple_cli.cli benchmarks

ocaml/tests/bench/dune

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
(executables
2-
(names bench_tracing bench_uuid)
3-
(libraries tracing bechamel bechamel-notty notty.unix tracing_export threads.posix fmt notty uuid)
2+
(names bench_tracing bench_uuid bench_throttle2)
3+
(libraries tracing bechamel bechamel-notty notty.unix tracing_export threads.posix fmt notty uuid xapi_aux tests_common log xapi_internal)
44
)

ocaml/xapi-cli-server/cli_operations.ml

Lines changed: 104 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -2848,8 +2848,6 @@ exception Finished
28482848
let event_wait_gen rpc session_id classname record_matches =
28492849
(* Immediately register *)
28502850
let classes = [classname] in
2851-
Client.Event.register ~rpc ~session_id ~classes ;
2852-
debug "Registered for events" ;
28532851
(* Check to see if the condition is already satisfied - get all objects of whatever class specified... *)
28542852
let poll () =
28552853
let current_tbls =
@@ -2930,96 +2928,111 @@ let event_wait_gen rpc session_id classname record_matches =
29302928
in
29312929
List.exists record_matches all_recs
29322930
in
2933-
finally
2934-
(fun () ->
2935-
if not (poll ()) then
2936-
try
2937-
while true do
2938-
try
2939-
let events =
2940-
Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id)
2941-
in
2942-
let doevent event =
2943-
let tbl =
2944-
match Event_helper.record_of_event event with
2945-
| Event_helper.VM (r, Some x) ->
2946-
let record = vm_record rpc session_id r in
2947-
record.setrefrec (r, x) ;
2948-
record.fields
2949-
| Event_helper.VDI (r, Some x) ->
2950-
let record = vdi_record rpc session_id r in
2951-
record.setrefrec (r, x) ;
2952-
record.fields
2953-
| Event_helper.SR (r, Some x) ->
2954-
let record = sr_record rpc session_id r in
2955-
record.setrefrec (r, x) ;
2956-
record.fields
2957-
| Event_helper.Host (r, Some x) ->
2958-
let record = host_record rpc session_id r in
2959-
record.setrefrec (r, x) ;
2960-
record.fields
2961-
| Event_helper.Network (r, Some x) ->
2962-
let record = net_record rpc session_id r in
2963-
record.setrefrec (r, x) ;
2964-
record.fields
2965-
| Event_helper.VIF (r, Some x) ->
2966-
let record = vif_record rpc session_id r in
2967-
record.setrefrec (r, x) ;
2968-
record.fields
2969-
| Event_helper.PIF (r, Some x) ->
2970-
let record = pif_record rpc session_id r in
2971-
record.setrefrec (r, x) ;
2972-
record.fields
2973-
| Event_helper.VBD (r, Some x) ->
2974-
let record = vbd_record rpc session_id r in
2975-
record.setrefrec (r, x) ;
2976-
record.fields
2977-
| Event_helper.PBD (r, Some x) ->
2978-
let record = pbd_record rpc session_id r in
2979-
record.setrefrec (r, x) ;
2980-
record.fields
2981-
| Event_helper.Pool (r, Some x) ->
2982-
let record = pool_record rpc session_id r in
2983-
record.setrefrec (r, x) ;
2984-
record.fields
2985-
| Event_helper.Task (r, Some x) ->
2986-
let record = task_record rpc session_id r in
2987-
record.setrefrec (r, x) ;
2988-
record.fields
2989-
| Event_helper.VMSS (r, Some x) ->
2990-
let record = vmss_record rpc session_id r in
2991-
record.setrefrec (r, x) ;
2992-
record.fields
2993-
| Event_helper.Secret (r, Some x) ->
2994-
let record = secret_record rpc session_id r in
2995-
record.setrefrec (r, x) ;
2996-
record.fields
2997-
| _ ->
2998-
failwith
2999-
("Cli listening for class '"
3000-
^ classname
3001-
^ "' not currently implemented"
3002-
)
3003-
in
3004-
let record =
3005-
List.map (fun r -> (r.name, fun () -> safe_get_field r)) tbl
3006-
in
3007-
if record_matches record then raise Finished
2931+
let use_event_next = !Constants.use_event_next in
2932+
let run () =
2933+
if not (poll ()) then
2934+
try
2935+
let token = ref "" in
2936+
while true do
2937+
let events =
2938+
if use_event_next then
2939+
Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id)
2940+
else
2941+
let event_from =
2942+
Event_types.event_from_of_rpc
2943+
(Client.Event.from ~rpc ~session_id ~timeout:30. ~token:!token
2944+
~classes
2945+
)
30082946
in
3009-
List.iter doevent
3010-
(List.filter (fun e -> e.Event_types.snapshot <> None) events)
3011-
with
3012-
| Api_errors.Server_error (code, _)
3013-
when code = Api_errors.events_lost
3014-
->
3015-
debug "Got EVENTS_LOST; reregistering" ;
3016-
Client.Event.unregister ~rpc ~session_id ~classes ;
3017-
Client.Event.register ~rpc ~session_id ~classes ;
3018-
if poll () then raise Finished
3019-
done
3020-
with Finished -> ()
3021-
)
3022-
(fun () -> Client.Event.unregister ~rpc ~session_id ~classes)
2947+
token := event_from.token ;
2948+
event_from.events
2949+
in
2950+
let doevent event =
2951+
let tbl =
2952+
match Event_helper.record_of_event event with
2953+
| Event_helper.VM (r, Some x) ->
2954+
let record = vm_record rpc session_id r in
2955+
record.setrefrec (r, x) ;
2956+
record.fields
2957+
| Event_helper.VDI (r, Some x) ->
2958+
let record = vdi_record rpc session_id r in
2959+
record.setrefrec (r, x) ;
2960+
record.fields
2961+
| Event_helper.SR (r, Some x) ->
2962+
let record = sr_record rpc session_id r in
2963+
record.setrefrec (r, x) ;
2964+
record.fields
2965+
| Event_helper.Host (r, Some x) ->
2966+
let record = host_record rpc session_id r in
2967+
record.setrefrec (r, x) ;
2968+
record.fields
2969+
| Event_helper.Network (r, Some x) ->
2970+
let record = net_record rpc session_id r in
2971+
record.setrefrec (r, x) ;
2972+
record.fields
2973+
| Event_helper.VIF (r, Some x) ->
2974+
let record = vif_record rpc session_id r in
2975+
record.setrefrec (r, x) ;
2976+
record.fields
2977+
| Event_helper.PIF (r, Some x) ->
2978+
let record = pif_record rpc session_id r in
2979+
record.setrefrec (r, x) ;
2980+
record.fields
2981+
| Event_helper.VBD (r, Some x) ->
2982+
let record = vbd_record rpc session_id r in
2983+
record.setrefrec (r, x) ;
2984+
record.fields
2985+
| Event_helper.PBD (r, Some x) ->
2986+
let record = pbd_record rpc session_id r in
2987+
record.setrefrec (r, x) ;
2988+
record.fields
2989+
| Event_helper.Pool (r, Some x) ->
2990+
let record = pool_record rpc session_id r in
2991+
record.setrefrec (r, x) ;
2992+
record.fields
2993+
| Event_helper.Task (r, Some x) ->
2994+
let record = task_record rpc session_id r in
2995+
record.setrefrec (r, x) ;
2996+
record.fields
2997+
| Event_helper.VMSS (r, Some x) ->
2998+
let record = vmss_record rpc session_id r in
2999+
record.setrefrec (r, x) ;
3000+
record.fields
3001+
| Event_helper.Secret (r, Some x) ->
3002+
let record = secret_record rpc session_id r in
3003+
record.setrefrec (r, x) ;
3004+
record.fields
3005+
| _ ->
3006+
failwith
3007+
("Cli listening for class '"
3008+
^ classname
3009+
^ "' not currently implemented"
3010+
)
3011+
in
3012+
let record =
3013+
List.map (fun r -> (r.name, fun () -> safe_get_field r)) tbl
3014+
in
3015+
if record_matches record then raise_notrace Finished
3016+
in
3017+
List.iter doevent
3018+
(List.filter (fun e -> e.Event_types.snapshot <> None) events)
3019+
done
3020+
with
3021+
| Api_errors.Server_error (code, _)
3022+
when code = Api_errors.events_lost && use_event_next ->
3023+
debug "Got EVENTS_LOST; reregistering" ;
3024+
Client.Event.unregister ~rpc ~session_id ~classes ;
3025+
Client.Event.register ~rpc ~session_id ~classes ;
3026+
if poll () then raise Finished
3027+
| Finished ->
3028+
()
3029+
in
3030+
if use_event_next then (
3031+
Client.Event.register ~rpc ~session_id ~classes ;
3032+
debug "Registered for events" ;
3033+
finally run (fun () -> Client.Event.unregister ~rpc ~session_id ~classes)
3034+
) else
3035+
run ()
30233036
30243037
(* We're done. Unregister and finish *)
30253038

ocaml/xapi-cli-server/cli_util.ml

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,41 @@ exception Cli_failure of string
4242

4343
(** call [callback task_record] on every update to the task, until it completes or fails *)
4444
let track callback rpc (session_id : API.ref_session) task =
45-
let classes = ["task"] in
45+
let use_event_next = !Constants.use_event_next in
46+
let classes =
47+
if use_event_next then
48+
["task"]
49+
else
50+
[Printf.sprintf "task/%s" (Ref.string_of task)]
51+
in
4652
finally
4753
(fun () ->
4854
let finished = ref false in
4955
while not !finished do
50-
Client.Event.register ~rpc ~session_id ~classes ;
56+
if use_event_next then
57+
Client.Event.register ~rpc ~session_id ~classes ;
5158
try
5259
(* Need to check once after registering to avoid a race *)
5360
finished :=
5461
Client.Task.get_status ~rpc ~session_id ~self:task <> `pending ;
62+
let token = ref "" in
5563
while not !finished do
5664
let events =
57-
Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id)
65+
if use_event_next then
66+
let events =
67+
Event_types.events_of_rpc (Client.Event.next ~rpc ~session_id)
68+
in
69+
List.map Event_helper.record_of_event events
70+
else
71+
let event_from =
72+
Event_types.event_from_of_rpc
73+
(Client.Event.from ~rpc ~session_id ~classes ~token:!token
74+
~timeout:30.
75+
)
76+
in
77+
token := event_from.token ;
78+
List.map Event_helper.record_of_event event_from.events
5879
in
59-
let events = List.map Event_helper.record_of_event events in
6080
List.iter
6181
(function
6282
| Event_helper.Task (t, Some t_rec) when t = task ->

ocaml/xapi-consts/constants.ml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,9 @@ let owner_key = "owner"
275275

276276
(* set in VBD other-config to indicate that clients can delete the attached VDI on VM uninstall if they want.. *)
277277

278+
(* xapi-cli-server doesn't link xapi-globs *)
279+
let use_event_next = ref true
280+
278281
(* the time taken to wait before restarting in a different mode for pool eject/join operations *)
279282
let fuse_time = ref 10.
280283

ocaml/xapi-types/event_types.ml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,24 @@ let rec rpc_of_event_from e =
7777
; ("token", rpc_of_token e.token)
7878
]
7979

80+
(* xmlrpc and jsonrpc would map Int32 to Int, but int32_of_rpc can't actually parse
81+
an Int32 back as an int32... this is a bug in ocaml-rpc that should be fixed.
82+
meanwhile work it around by mapping Rpc.Int32 to Rpc.Int upon receiving the message
83+
(it is only Rpc.Int32 for backward compat with non-XAPI Xmlrpc clients)
84+
*)
85+
86+
let rec fixup_int32 = function
87+
| Rpc.Dict dict ->
88+
Rpc.Dict (List.map fixup_kv dict)
89+
| Rpc.Int32 i ->
90+
Rpc.Int (Int64.of_int32 i)
91+
| rpc ->
92+
rpc
93+
94+
and fixup_kv (k, v) = (k, fixup_int32 v)
95+
96+
let event_from_of_rpc rpc = rpc |> fixup_int32 |> event_from_of_rpc
97+
8098
(** Return result of an events.from call *)
8199

82100
open Printf

ocaml/xapi/taskHelper.mli

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ val set_result : __context:Context.t -> Rpc.t option -> unit
3636

3737
val status_is_completed : [> `cancelled | `failure | `success] -> bool
3838

39+
val status_to_string :
40+
[< `pending | `success | `failure | `cancelling | `cancelled] -> string
41+
3942
val complete : __context:Context.t -> Rpc.t option -> unit
4043

4144
val set_cancellable : __context:Context.t -> unit

0 commit comments

Comments
 (0)