Skip to content

Commit cfdeed7

Browse files
committed
src: improve thread safety of TaskQueue
1 parent 5077ea4 commit cfdeed7

File tree

2 files changed

+115
-109
lines changed

2 files changed

+115
-109
lines changed

src/node_platform.cc

Lines changed: 49 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ static void PlatformWorkerThread(void* data) {
4040
worker_data->platform_workers_ready->Signal(lock);
4141
}
4242

43-
while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) {
43+
while (std::unique_ptr<Task> task = pending_worker_tasks->Lock().BlockingPop()) {
4444
task->Run();
45-
pending_worker_tasks->NotifyOfCompletion();
45+
pending_worker_tasks->Lock().NotifyOfCompletion();
4646
}
4747
}
4848

@@ -73,13 +73,15 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
7373
}
7474

7575
void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) {
76-
tasks_.Push(std::make_unique<ScheduleTask>(this, std::move(task),
77-
delay_in_seconds));
76+
auto locked = tasks_.Lock();
77+
locked.Push(std::make_unique<ScheduleTask>(this, std::move(task),
78+
delay_in_seconds));
7879
uv_async_send(&flush_tasks_);
7980
}
8081

8182
void Stop() {
82-
tasks_.Push(std::make_unique<StopTask>(this));
83+
auto locked = tasks_.Lock();
84+
locked.Push(std::make_unique<StopTask>(this));
8385
uv_async_send(&flush_tasks_);
8486
}
8587

@@ -100,7 +102,8 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
100102
static void FlushTasks(uv_async_t* flush_tasks) {
101103
DelayedTaskScheduler* scheduler =
102104
ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
103-
while (std::unique_ptr<Task> task = scheduler->tasks_.Pop())
105+
auto locked = scheduler->tasks_.Lock();
106+
while (std::unique_ptr<Task> task = locked.Pop())
104107
task->Run();
105108
}
106109

@@ -149,7 +152,7 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
149152
static void RunTask(uv_timer_t* timer) {
150153
DelayedTaskScheduler* scheduler =
151154
ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
152-
scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer));
155+
scheduler->pending_worker_tasks_->Lock().Push(scheduler->TakeTimerTask(timer));
153156
}
154157

155158
std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) {
@@ -203,7 +206,7 @@ WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
203206
}
204207

205208
void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
206-
pending_worker_tasks_.Push(std::move(task));
209+
pending_worker_tasks_.Lock().Push(std::move(task));
207210
}
208211

209212
void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
@@ -212,11 +215,11 @@ void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
212215
}
213216

214217
void WorkerThreadsTaskRunner::BlockingDrain() {
215-
pending_worker_tasks_.BlockingDrain();
218+
pending_worker_tasks_.Lock().BlockingDrain();
216219
}
217220

218221
void WorkerThreadsTaskRunner::Shutdown() {
219-
pending_worker_tasks_.Stop();
222+
pending_worker_tasks_.Lock().Stop();
220223
delayed_task_scheduler_->Stop();
221224
for (size_t i = 0; i < threads_.size(); i++) {
222225
CHECK_EQ(0, uv_thread_join(threads_[i].get()));
@@ -253,20 +256,22 @@ void PerIsolatePlatformData::PostIdleTaskImpl(
253256

254257
void PerIsolatePlatformData::PostTaskImpl(std::unique_ptr<Task> task,
255258
const v8::SourceLocation& location) {
256-
if (flush_tasks_ == nullptr) {
259+
// Check if flush_tasks is valid before using it
260+
if (!flush_tasks_valid_.load()) {
257261
// V8 may post tasks during Isolate disposal. In that case, the only
258262
// sensible path forward is to discard the task.
259263
return;
260264
}
261-
foreground_tasks_.Push(std::move(task));
265+
foreground_tasks_.Lock().Push(std::move(task));
262266
uv_async_send(flush_tasks_);
263267
}
264268

265269
void PerIsolatePlatformData::PostDelayedTaskImpl(
266270
std::unique_ptr<Task> task,
267271
double delay_in_seconds,
268272
const v8::SourceLocation& location) {
269-
if (flush_tasks_ == nullptr) {
273+
// Check if flush_tasks is valid before using it
274+
if (!flush_tasks_valid_.load()) {
270275
// V8 may post tasks during Isolate disposal. In that case, the only
271276
// sensible path forward is to discard the task.
272277
return;
@@ -275,7 +280,7 @@ void PerIsolatePlatformData::PostDelayedTaskImpl(
275280
delayed->task = std::move(task);
276281
delayed->platform_data = shared_from_this();
277282
delayed->timeout = delay_in_seconds;
278-
foreground_delayed_tasks_.Push(std::move(delayed));
283+
foreground_delayed_tasks_.Lock().Push(std::move(delayed));
279284
uv_async_send(flush_tasks_);
280285
}
281286

@@ -301,15 +306,19 @@ void PerIsolatePlatformData::AddShutdownCallback(void (*callback)(void*),
301306
}
302307

303308
void PerIsolatePlatformData::Shutdown() {
304-
if (flush_tasks_ == nullptr)
309+
// First mark the flush_tasks as invalid
310+
bool expected = true;
311+
if (!flush_tasks_valid_.compare_exchange_strong(expected, false)) {
312+
// Already marked invalid
305313
return;
314+
}
306315

307316
// While there should be no V8 tasks in the queues at this point, it is
308317
// possible that Node.js-internal tasks from e.g. the inspector are still
309318
// lying around. We clear these queues and ignore the return value,
310319
// effectively deleting the tasks instead of running them.
311-
foreground_delayed_tasks_.PopAll();
312-
foreground_tasks_.PopAll();
320+
foreground_delayed_tasks_.Lock().PopAll();
321+
foreground_tasks_.Lock().PopAll();
313322
scheduled_delayed_tasks_.clear();
314323

315324
// Both destroying the scheduled_delayed_tasks_ lists and closing
@@ -472,33 +481,36 @@ void NodePlatform::DrainTasks(Isolate* isolate) {
472481
bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
473482
bool did_work = false;
474483

475-
while (std::unique_ptr<DelayedTask> delayed =
476-
foreground_delayed_tasks_.Pop()) {
477-
did_work = true;
478-
uint64_t delay_millis = llround(delayed->timeout * 1000);
479-
480-
delayed->timer.data = static_cast<void*>(delayed.get());
481-
uv_timer_init(loop_, &delayed->timer);
482-
// Timers may not guarantee queue ordering of events with the same delay if
483-
// the delay is non-zero. This should not be a problem in practice.
484-
uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
485-
uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
486-
uv_handle_count_++;
487-
488-
scheduled_delayed_tasks_.emplace_back(delayed.release(),
484+
{
485+
auto locked_tasks = foreground_delayed_tasks_.Lock();
486+
while (std::unique_ptr<DelayedTask> delayed = locked_tasks.Pop()) {
487+
did_work = true;
488+
uint64_t delay_millis = llround(delayed->timeout * 1000);
489+
490+
delayed->timer.data = static_cast<void*>(delayed.get());
491+
uv_timer_init(loop_, &delayed->timer);
492+
// Timers may not guarantee queue ordering of events with the same delay if
493+
// the delay is non-zero. This should not be a problem in practice.
494+
uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
495+
uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
496+
uv_handle_count_++;
497+
498+
scheduled_delayed_tasks_.emplace_back(delayed.release(),
489499
[](DelayedTask* delayed) {
490-
uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
500+
uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
491501
[](uv_handle_t* handle) {
492-
std::unique_ptr<DelayedTask> task {
493-
static_cast<DelayedTask*>(handle->data) };
494-
task->platform_data->DecreaseHandleCount();
502+
std::unique_ptr<DelayedTask> task {
503+
static_cast<DelayedTask*>(handle->data) };
504+
task->platform_data->DecreaseHandleCount();
505+
});
495506
});
496-
});
507+
}
497508
}
509+
498510
// Move all foreground tasks into a separate queue and flush that queue.
499511
// This way tasks that are posted while flushing the queue will be run on the
500512
// next call of FlushForegroundTasksInternal.
501-
std::queue<std::unique_ptr<Task>> tasks = foreground_tasks_.PopAll();
513+
std::queue<std::unique_ptr<Task>> tasks = foreground_tasks_.Lock().PopAll();
502514
while (!tasks.empty()) {
503515
std::unique_ptr<Task> task = std::move(tasks.front());
504516
tasks.pop();
@@ -593,68 +605,4 @@ TaskQueue<T>::TaskQueue()
593605
: lock_(), tasks_available_(), tasks_drained_(),
594606
outstanding_tasks_(0), stopped_(false), task_queue_() { }
595607

596-
template <class T>
597-
void TaskQueue<T>::Push(std::unique_ptr<T> task) {
598-
Mutex::ScopedLock scoped_lock(lock_);
599-
outstanding_tasks_++;
600-
task_queue_.push(std::move(task));
601-
tasks_available_.Signal(scoped_lock);
602-
}
603-
604-
template <class T>
605-
std::unique_ptr<T> TaskQueue<T>::Pop() {
606-
Mutex::ScopedLock scoped_lock(lock_);
607-
if (task_queue_.empty()) {
608-
return std::unique_ptr<T>(nullptr);
609-
}
610-
std::unique_ptr<T> result = std::move(task_queue_.front());
611-
task_queue_.pop();
612-
return result;
613-
}
614-
615-
template <class T>
616-
std::unique_ptr<T> TaskQueue<T>::BlockingPop() {
617-
Mutex::ScopedLock scoped_lock(lock_);
618-
while (task_queue_.empty() && !stopped_) {
619-
tasks_available_.Wait(scoped_lock);
620-
}
621-
if (stopped_) {
622-
return std::unique_ptr<T>(nullptr);
623-
}
624-
std::unique_ptr<T> result = std::move(task_queue_.front());
625-
task_queue_.pop();
626-
return result;
627-
}
628-
629-
template <class T>
630-
void TaskQueue<T>::NotifyOfCompletion() {
631-
Mutex::ScopedLock scoped_lock(lock_);
632-
if (--outstanding_tasks_ == 0) {
633-
tasks_drained_.Broadcast(scoped_lock);
634-
}
635-
}
636-
637-
template <class T>
638-
void TaskQueue<T>::BlockingDrain() {
639-
Mutex::ScopedLock scoped_lock(lock_);
640-
while (outstanding_tasks_ > 0) {
641-
tasks_drained_.Wait(scoped_lock);
642-
}
643-
}
644-
645-
template <class T>
646-
void TaskQueue<T>::Stop() {
647-
Mutex::ScopedLock scoped_lock(lock_);
648-
stopped_ = true;
649-
tasks_available_.Broadcast(scoped_lock);
650-
}
651-
652-
template <class T>
653-
std::queue<std::unique_ptr<T>> TaskQueue<T>::PopAll() {
654-
Mutex::ScopedLock scoped_lock(lock_);
655-
std::queue<std::unique_ptr<T>> result;
656-
result.swap(task_queue_);
657-
return result;
658-
}
659-
660608
} // namespace node

src/node_platform.h

Lines changed: 66 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,72 @@ class PerIsolatePlatformData;
2222
template <class T>
2323
class TaskQueue {
2424
public:
25+
class Locked {
26+
public:
27+
explicit Locked(TaskQueue* queue)
28+
: queue_(queue), lock_(queue->lock_) {}
29+
30+
void Push(std::unique_ptr<T> task) {
31+
queue_->outstanding_tasks_++;
32+
queue_->task_queue_.push(std::move(task));
33+
queue_->tasks_available_.Signal(lock_);
34+
}
35+
36+
std::unique_ptr<T> Pop() {
37+
if (queue_->task_queue_.empty()) {
38+
return std::unique_ptr<T>(nullptr);
39+
}
40+
std::unique_ptr<T> result = std::move(queue_->task_queue_.front());
41+
queue_->task_queue_.pop();
42+
return result;
43+
}
44+
45+
std::unique_ptr<T> BlockingPop() {
46+
while (queue_->task_queue_.empty() && !queue_->stopped_) {
47+
queue_->tasks_available_.Wait(lock_);
48+
}
49+
if (queue_->stopped_) {
50+
return std::unique_ptr<T>(nullptr);
51+
}
52+
std::unique_ptr<T> result = std::move(queue_->task_queue_.front());
53+
queue_->task_queue_.pop();
54+
return result;
55+
}
56+
57+
void NotifyOfCompletion() {
58+
if (--queue_->outstanding_tasks_ == 0) {
59+
queue_->tasks_drained_.Broadcast(lock_);
60+
}
61+
}
62+
63+
void BlockingDrain() {
64+
while (queue_->outstanding_tasks_ > 0) {
65+
queue_->tasks_drained_.Wait(lock_);
66+
}
67+
}
68+
69+
void Stop() {
70+
queue_->stopped_ = true;
71+
queue_->tasks_available_.Broadcast(lock_);
72+
}
73+
74+
std::queue<std::unique_ptr<T>> PopAll() {
75+
std::queue<std::unique_ptr<T>> result;
76+
result.swap(queue_->task_queue_);
77+
return result;
78+
}
79+
80+
private:
81+
TaskQueue* queue_;
82+
Mutex::ScopedLock lock_;
83+
};
84+
2585
TaskQueue();
2686
~TaskQueue() = default;
2787

28-
void Push(std::unique_ptr<T> task);
29-
std::unique_ptr<T> Pop();
30-
std::unique_ptr<T> BlockingPop();
31-
std::queue<std::unique_ptr<T>> PopAll();
32-
void NotifyOfCompletion();
33-
void BlockingDrain();
34-
void Stop();
88+
Locked Lock() {
89+
return Locked(this);
90+
}
3591

3692
private:
3793
Mutex lock_;
@@ -98,6 +154,9 @@ class PerIsolatePlatformData
98154
void RunForegroundTask(std::unique_ptr<v8::Task> task);
99155
static void RunForegroundTask(uv_timer_t* timer);
100156

157+
uv_async_t* flush_tasks_ = nullptr;
158+
std::atomic<bool> flush_tasks_valid_{true};
159+
101160
struct ShutdownCallback {
102161
void (*cb)(void*);
103162
void* data;
@@ -110,7 +169,6 @@ class PerIsolatePlatformData
110169

111170
v8::Isolate* const isolate_;
112171
uv_loop_t* const loop_;
113-
uv_async_t* flush_tasks_ = nullptr;
114172
TaskQueue<v8::Task> foreground_tasks_;
115173
TaskQueue<DelayedTask> foreground_delayed_tasks_;
116174

0 commit comments

Comments
 (0)