Skip to content

Commit be3de8f

Browse files
committed
Add test (excluding postMessage notification)
1 parent 8e17f91 commit be3de8f

File tree

4 files changed

+298
-9
lines changed

4 files changed

+298
-9
lines changed

system/lib/pthread/proxying.c

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -239,15 +239,18 @@ int emscripten_proxy_async(em_proxying_queue* q,
239239
goto failed;
240240
}
241241
pthread_mutex_unlock(&q->mutex);
242-
// If the queue was previously empty, notify the target thread to process it.
243-
// Otherwise, the target thread was already notified when the existing work
244-
// was enqueued so we don't need to notify it again.
245-
if (empty) {
246-
// TODO: Add `q` to this notification so the target thread knows which queue
247-
// to process.
248-
_emscripten_notify_thread_queue(target_thread,
249-
emscripten_main_browser_thread_id());
250-
}
242+
/* // If the queue was previously empty, notify the target thread to process
243+
* it. */
244+
/* // Otherwise, the target thread was already notified when the existing work
245+
*/
246+
/* // was enqueued so we don't need to notify it again. */
247+
/* if (empty) { */
248+
/* // TODO: Add `q` to this notification so the target thread knows which
249+
* queue */
250+
/* // to process. */
251+
/* _emscripten_notify_thread_queue(target_thread, */
252+
/* emscripten_main_browser_thread_id()); */
253+
/* } */
251254
return 1;
252255

253256
failed:

tests/pthread/test_pthread_proxying.c

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
#define _GNU_SOURCE
2+
#include <assert.h>
3+
#include <emscripten.h>
4+
#include <pthread.h>
5+
#include <sched.h>
6+
#include <stdio.h>
7+
#include <stdlib.h>
8+
9+
#include "proxying.h"
10+
11+
// The worker threads we will use. `looper` sits in a loop, continuously
12+
// processing work as it becomes available, while `returner` returns to the JS
13+
// event loop each time it processes work.
14+
pthread_t main_thread;
15+
pthread_t looper;
16+
pthread_t returner;
17+
18+
// The queue used to send work to both `looper` and `returner`.
19+
em_proxying_queue* proxy_queue = NULL;
20+
_Atomic int should_quit = 0;
21+
22+
void* looper_main(void* arg) {
23+
while (!should_quit) {
24+
emscripten_proxy_execute_queue(proxy_queue);
25+
sched_yield();
26+
}
27+
return NULL;
28+
}
29+
30+
void* returner_main(void* arg) { return NULL; }
31+
32+
typedef struct widget {
33+
// `val` will be stored to `out` and the current thread will be stored to
34+
// `thread` when the widget is run.
35+
int* out;
36+
int val;
37+
pthread_t thread;
38+
39+
// Synchronization to allow waiting on a widget to run.
40+
pthread_mutex_t mutex;
41+
pthread_cond_t cond;
42+
43+
// Nonzero iff the widget has been run.
44+
int done;
45+
46+
// Only used for async_as_sync tests.
47+
em_proxying_ctx* ctx;
48+
} widget;
49+
50+
void init_widget(widget* w, int* out, int val) {
51+
*w = (widget){.out = out,
52+
.val = val,
53+
// .thread will be set in `run_widget`.
54+
.mutex = PTHREAD_MUTEX_INITIALIZER,
55+
.cond = PTHREAD_COND_INITIALIZER,
56+
.done = 0,
57+
.ctx = NULL};
58+
}
59+
60+
void destroy_widget(widget* w) {
61+
pthread_mutex_destroy(&w->mutex);
62+
pthread_cond_destroy(&w->cond);
63+
}
64+
65+
void run_widget(widget* w) {
66+
pthread_t self = pthread_self();
67+
const char* name = pthread_equal(self, main_thread) ? "main"
68+
: pthread_equal(self, looper) ? "looper"
69+
: pthread_equal(self, returner) ? "returner"
70+
: "unknown";
71+
printf("running widget %d on %s\n", w->val, name);
72+
pthread_mutex_lock(&w->mutex);
73+
if (w->out) {
74+
*w->out = w->val;
75+
}
76+
w->thread = pthread_self();
77+
w->done = 1;
78+
pthread_mutex_unlock(&w->mutex);
79+
pthread_cond_broadcast(&w->cond);
80+
}
81+
82+
void await_widget(widget* w) {
83+
pthread_mutex_lock(&w->mutex);
84+
while (!w->done) {
85+
pthread_cond_wait(&w->cond, &w->mutex);
86+
}
87+
pthread_mutex_unlock(&w->mutex);
88+
}
89+
90+
// Helper functions we will proxy to perform our work.
91+
92+
void do_run_widget(void* arg) { run_widget((widget*)arg); }
93+
94+
void finish_running_widget(void* arg) {
95+
widget* w = (widget*)arg;
96+
run_widget(w);
97+
emscripten_proxy_finish(w->ctx);
98+
}
99+
100+
void start_running_widget(em_proxying_ctx* ctx, void* arg) {
101+
((widget*)arg)->ctx = ctx;
102+
emscripten_async_call(finish_running_widget, arg, 0);
103+
}
104+
105+
void start_and_finish_running_widget(em_proxying_ctx* ctx, void* arg) {
106+
((widget*)arg)->ctx = ctx;
107+
finish_running_widget(arg);
108+
}
109+
110+
// Main test functions
111+
112+
void test_proxy_async(void) {
113+
printf("Testing async proxying\n");
114+
115+
int i = 0;
116+
widget w1, w2, w3;
117+
init_widget(&w1, &i, 1);
118+
init_widget(&w2, &i, 2);
119+
init_widget(&w3, &i, 3);
120+
121+
// Proxy to ourselves.
122+
emscripten_proxy_async(proxy_queue, pthread_self(), do_run_widget, &w1);
123+
assert(!w1.done);
124+
emscripten_proxy_execute_queue(proxy_queue);
125+
assert(i == 1);
126+
assert(w1.done);
127+
assert(pthread_equal(w1.thread, pthread_self()));
128+
129+
// Proxy to looper.
130+
emscripten_proxy_async(proxy_queue, looper, do_run_widget, &w2);
131+
await_widget(&w2);
132+
assert(i == 2);
133+
assert(w2.done);
134+
assert(pthread_equal(w2.thread, looper));
135+
136+
/* // Proxy to returner. */
137+
/* emscripten_proxy_async(proxy_queue, returner, do_run_widget, &w3); */
138+
/* await_widget(&w3); */
139+
/* assert(i == 3); */
140+
/* assert(w3.done); */
141+
/* assert(pthread_equal(w3.thread, returner)); */
142+
143+
destroy_widget(&w1);
144+
destroy_widget(&w2);
145+
destroy_widget(&w3);
146+
}
147+
148+
void test_proxy_sync(void) {
149+
printf("Testing sync proxying\n");
150+
151+
int i = 0;
152+
widget w4, w5;
153+
init_widget(&w4, &i, 4);
154+
init_widget(&w5, &i, 5);
155+
156+
// Proxy to looper.
157+
emscripten_proxy_sync(proxy_queue, looper, do_run_widget, &w4);
158+
assert(i == 4);
159+
assert(w4.done);
160+
assert(pthread_equal(w4.thread, looper));
161+
162+
/* // Proxy to returner. */
163+
/* emscripten_proxy_sync(proxy_queue, returner, do_run_widget, &w5); */
164+
/* assert(i == 5); */
165+
/* assert(w5.done); */
166+
/* assert(pthread_equal(w5.thread, returner)); */
167+
168+
destroy_widget(&w4);
169+
destroy_widget(&w5);
170+
}
171+
172+
void test_proxy_sync_with_ctx(void) {
173+
printf("Testing sync_with_ctx proxying\n");
174+
175+
int i = 0;
176+
widget w6, w7;
177+
init_widget(&w6, &i, 6);
178+
init_widget(&w7, &i, 7);
179+
180+
// Proxy to looper.
181+
emscripten_proxy_sync_with_ctx(
182+
proxy_queue, looper, start_and_finish_running_widget, &w6);
183+
assert(i == 6);
184+
assert(w6.done);
185+
assert(pthread_equal(w6.thread, looper));
186+
187+
/* // Proxy to returner. */
188+
/* emscripten_proxy_sync_with_ctx(proxy_queue, returner, start_running_widget,
189+
* &w7); */
190+
/* assert(i == 7); */
191+
/* assert(w7.done); */
192+
/* assert(pthread_equal(w7.thread, returner)); */
193+
194+
destroy_widget(&w6);
195+
destroy_widget(&w7);
196+
}
197+
198+
typedef struct increment_to_arg {
199+
int* ip;
200+
int i;
201+
} increment_to_arg;
202+
203+
void increment_to(void* arg_p) {
204+
increment_to_arg* arg = (increment_to_arg*)arg_p;
205+
assert(*arg->ip == arg->i - 1);
206+
*arg->ip = arg->i;
207+
free(arg);
208+
}
209+
210+
void test_queue_growth(void) {
211+
printf("Testing queue growth\n");
212+
213+
em_proxying_queue* queue = em_proxying_queue_create();
214+
assert(proxy_queue != NULL);
215+
216+
int incremented = 0;
217+
218+
// Initial queue capacity is 128. Force that to double twice with the head at
219+
// index 0 by inserting more than 256 items.
220+
for (int i = 1; i <= 300; i++) {
221+
increment_to_arg* arg = malloc(sizeof(increment_to_arg));
222+
*arg = (increment_to_arg){&incremented, i};
223+
int res = emscripten_proxy_async(queue, pthread_self(), increment_to, arg);
224+
assert(res == 1);
225+
}
226+
227+
// Drain the queue, moving the head somewhere into the middle of the buffer of
228+
// capacity 512.
229+
emscripten_proxy_execute_queue(queue);
230+
assert(incremented == 300);
231+
232+
// Double the queue size twice more by inserting more than 1024 items.
233+
for (int i = 301; i <= 1500; i++) {
234+
increment_to_arg* arg = malloc(sizeof(increment_to_arg));
235+
*arg = (increment_to_arg){&incremented, i};
236+
int res = emscripten_proxy_async(queue, pthread_self(), increment_to, arg);
237+
assert(res == 1);
238+
}
239+
240+
// Drain the queue again.
241+
emscripten_proxy_execute_queue(queue);
242+
assert(incremented == 1500);
243+
244+
em_proxying_queue_destroy(queue);
245+
}
246+
247+
void force_exit(void* arg) { emscripten_force_exit(0); }
248+
249+
int main(int argc, char* argv[]) {
250+
main_thread = pthread_self();
251+
252+
proxy_queue = em_proxying_queue_create();
253+
assert(proxy_queue != NULL);
254+
255+
pthread_create(&looper, NULL, looper_main, NULL);
256+
pthread_create(&returner, NULL, returner_main, NULL);
257+
258+
test_proxy_async();
259+
test_proxy_sync();
260+
test_proxy_sync_with_ctx();
261+
262+
should_quit = 1;
263+
pthread_join(looper, NULL);
264+
pthread_join(returner, NULL);
265+
em_proxying_queue_destroy(proxy_queue);
266+
267+
test_queue_growth();
268+
269+
printf("done\n");
270+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
Testing async proxying
2+
running widget 1 on main
3+
running widget 2 on looper
4+
Testing sync proxying
5+
running widget 4 on looper
6+
Testing sync_with_ctx proxying
7+
running widget 6 on looper
8+
Testing queue growth
9+
done

tests/test_core.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2419,6 +2419,13 @@ def test_pthread_specific(self):
24192419
def test_pthread_equal(self):
24202420
self.do_run_in_out_file_test('pthread/test_pthread_equal.cpp')
24212421

2422+
@node_pthreads
2423+
def test_pthread_proxying(self):
2424+
self.set_setting('EXIT_RUNTIME')
2425+
self.set_setting('PTHREAD_POOL_SIZE=2')
2426+
args = [f'-I{path_from_root("system/lib/pthread")}']
2427+
self.do_run_in_out_file_test('pthread/test_pthread_proxying.c', emcc_args=args)
2428+
24222429
@node_pthreads
24232430
def test_pthread_dispatch_after_exit(self):
24242431
self.do_run_in_out_file_test('pthread/test_pthread_dispatch_after_exit.c', interleaved_output=False)

0 commit comments

Comments
 (0)