Skip to content

Commit 4df6983

Browse files
codebytereaduh95
authored andcommitted
src: improve thread safety of TaskQueue
PR-URL: #57910 Reviewed-By: Ben Noordhuis <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Joyee Cheung <[email protected]>
1 parent 10c7a75 commit 4df6983

File tree

2 files changed

+130
-102
lines changed

2 files changed

+130
-102
lines changed

src/node_platform.cc

Lines changed: 106 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,10 @@ static void PlatformWorkerThread(void* data) {
3939
worker_data->platform_workers_ready->Signal(lock);
4040
}
4141

42-
while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) {
42+
while (std::unique_ptr<Task> task =
43+
pending_worker_tasks->Lock().BlockingPop()) {
4344
task->Run();
44-
pending_worker_tasks->NotifyOfCompletion();
45+
pending_worker_tasks->Lock().NotifyOfCompletion();
4546
}
4647
}
4748

@@ -72,13 +73,15 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
7273
}
7374

7475
void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) {
75-
tasks_.Push(std::make_unique<ScheduleTask>(this, std::move(task),
76-
delay_in_seconds));
76+
auto locked = tasks_.Lock();
77+
locked.Push(std::make_unique<ScheduleTask>(
78+
this, std::move(task), delay_in_seconds));
7779
uv_async_send(&flush_tasks_);
7880
}
7981

8082
void Stop() {
81-
tasks_.Push(std::make_unique<StopTask>(this));
83+
auto locked = tasks_.Lock();
84+
locked.Push(std::make_unique<StopTask>(this));
8285
uv_async_send(&flush_tasks_);
8386
}
8487

@@ -99,8 +102,14 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
99102
static void FlushTasks(uv_async_t* flush_tasks) {
100103
DelayedTaskScheduler* scheduler =
101104
ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
102-
while (std::unique_ptr<Task> task = scheduler->tasks_.Pop())
105+
106+
std::queue<std::unique_ptr<Task>> tasks_to_run =
107+
scheduler->tasks_.Lock().PopAll();
108+
while (!tasks_to_run.empty()) {
109+
std::unique_ptr<Task> task = std::move(tasks_to_run.front());
110+
tasks_to_run.pop();
103111
task->Run();
112+
}
104113
}
105114

106115
class StopTask : public Task {
@@ -148,7 +157,8 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
148157
static void RunTask(uv_timer_t* timer) {
149158
DelayedTaskScheduler* scheduler =
150159
ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
151-
scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer));
160+
scheduler->pending_worker_tasks_->Lock().Push(
161+
scheduler->TakeTimerTask(timer));
152162
}
153163

154164
std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) {
@@ -202,7 +212,7 @@ WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
202212
}
203213

204214
void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
205-
pending_worker_tasks_.Push(std::move(task));
215+
pending_worker_tasks_.Lock().Push(std::move(task));
206216
}
207217

208218
void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
@@ -211,11 +221,11 @@ void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
211221
}
212222

213223
void WorkerThreadsTaskRunner::BlockingDrain() {
214-
pending_worker_tasks_.BlockingDrain();
224+
pending_worker_tasks_.Lock().BlockingDrain();
215225
}
216226

217227
void WorkerThreadsTaskRunner::Shutdown() {
218-
pending_worker_tasks_.Stop();
228+
pending_worker_tasks_.Lock().Stop();
219229
delayed_task_scheduler_->Stop();
220230
for (size_t i = 0; i < threads_.size(); i++) {
221231
CHECK_EQ(0, uv_thread_join(threads_[i].get()));
@@ -250,27 +260,25 @@ void PerIsolatePlatformData::PostIdleTask(std::unique_ptr<v8::IdleTask> task) {
250260
}
251261

252262
void PerIsolatePlatformData::PostTask(std::unique_ptr<Task> task) {
253-
if (flush_tasks_ == nullptr) {
254-
// V8 may post tasks during Isolate disposal. In that case, the only
255-
// sensible path forward is to discard the task.
256-
return;
257-
}
258-
foreground_tasks_.Push(std::move(task));
263+
// The task can be posted from any V8 background worker thread, even when
264+
// the foreground task runner is being cleaned up by Shutdown(). In that
265+
// case, make sure we wait until the shutdown is completed (which leads
266+
// to flush_tasks_ == nullptr, and the task will be discarded).
267+
auto locked = foreground_tasks_.Lock();
268+
if (flush_tasks_ == nullptr) return;
269+
locked.Push(std::move(task));
259270
uv_async_send(flush_tasks_);
260271
}
261272

262273
void PerIsolatePlatformData::PostDelayedTask(
263274
std::unique_ptr<Task> task, double delay_in_seconds) {
264-
if (flush_tasks_ == nullptr) {
265-
// V8 may post tasks during Isolate disposal. In that case, the only
266-
// sensible path forward is to discard the task.
267-
return;
268-
}
275+
auto locked = foreground_delayed_tasks_.Lock();
276+
if (flush_tasks_ == nullptr) return;
269277
std::unique_ptr<DelayedTask> delayed(new DelayedTask());
270278
delayed->task = std::move(task);
271279
delayed->platform_data = shared_from_this();
272280
delayed->timeout = delay_in_seconds;
273-
foreground_delayed_tasks_.Push(std::move(delayed));
281+
locked.Push(std::move(delayed));
274282
uv_async_send(flush_tasks_);
275283
}
276284

@@ -294,32 +302,30 @@ void PerIsolatePlatformData::AddShutdownCallback(void (*callback)(void*),
294302
}
295303

296304
void PerIsolatePlatformData::Shutdown() {
297-
if (flush_tasks_ == nullptr)
298-
return;
305+
auto foreground_tasks_locked = foreground_tasks_.Lock();
306+
auto foreground_delayed_tasks_locked = foreground_delayed_tasks_.Lock();
299307

300-
// While there should be no V8 tasks in the queues at this point, it is
301-
// possible that Node.js-internal tasks from e.g. the inspector are still
302-
// lying around. We clear these queues and ignore the return value,
303-
// effectively deleting the tasks instead of running them.
304-
foreground_delayed_tasks_.PopAll();
305-
foreground_tasks_.PopAll();
308+
foreground_delayed_tasks_locked.PopAll();
309+
foreground_tasks_locked.PopAll();
306310
scheduled_delayed_tasks_.clear();
307311

308-
// Both destroying the scheduled_delayed_tasks_ lists and closing
309-
// flush_tasks_ handle add tasks to the event loop. We keep a count of all
310-
// non-closed handles, and when that reaches zero, we inform any shutdown
311-
// callbacks that the platform is done as far as this Isolate is concerned.
312-
self_reference_ = shared_from_this();
313-
uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
314-
[](uv_handle_t* handle) {
315-
std::unique_ptr<uv_async_t> flush_tasks {
316-
reinterpret_cast<uv_async_t*>(handle) };
317-
PerIsolatePlatformData* platform_data =
318-
static_cast<PerIsolatePlatformData*>(flush_tasks->data);
319-
platform_data->DecreaseHandleCount();
320-
platform_data->self_reference_.reset();
321-
});
322-
flush_tasks_ = nullptr;
312+
if (flush_tasks_ != nullptr) {
313+
// Both destroying the scheduled_delayed_tasks_ lists and closing
314+
// flush_tasks_ handle add tasks to the event loop. We keep a count of all
315+
// non-closed handles, and when that reaches zero, we inform any shutdown
316+
// callbacks that the platform is done as far as this Isolate is concerned.
317+
self_reference_ = shared_from_this();
318+
uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
319+
[](uv_handle_t* handle) {
320+
std::unique_ptr<uv_async_t> flush_tasks{
321+
reinterpret_cast<uv_async_t*>(handle)};
322+
PerIsolatePlatformData* platform_data =
323+
static_cast<PerIsolatePlatformData*>(flush_tasks->data);
324+
platform_data->DecreaseHandleCount();
325+
platform_data->self_reference_.reset();
326+
});
327+
flush_tasks_ = nullptr;
328+
}
323329
}
324330

325331
void PerIsolatePlatformData::DecreaseHandleCount() {
@@ -465,39 +471,48 @@ void NodePlatform::DrainTasks(Isolate* isolate) {
465471
bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
466472
bool did_work = false;
467473

468-
while (std::unique_ptr<DelayedTask> delayed =
469-
foreground_delayed_tasks_.Pop()) {
474+
std::queue<std::unique_ptr<DelayedTask>> delayed_tasks_to_schedule =
475+
foreground_delayed_tasks_.Lock().PopAll();
476+
while (!delayed_tasks_to_schedule.empty()) {
477+
std::unique_ptr<DelayedTask> delayed =
478+
std::move(delayed_tasks_to_schedule.front());
479+
delayed_tasks_to_schedule.pop();
480+
470481
did_work = true;
471482
uint64_t delay_millis = llround(delayed->timeout * 1000);
472483

473484
delayed->timer.data = static_cast<void*>(delayed.get());
474485
uv_timer_init(loop_, &delayed->timer);
475-
// Timers may not guarantee queue ordering of events with the same delay if
476-
// the delay is non-zero. This should not be a problem in practice.
486+
// Timers may not guarantee queue ordering of events with the same delay
487+
// if the delay is non-zero. This should not be a problem in practice.
477488
uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
478489
uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
479490
uv_handle_count_++;
480491

481-
scheduled_delayed_tasks_.emplace_back(delayed.release(),
482-
[](DelayedTask* delayed) {
483-
uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
484-
[](uv_handle_t* handle) {
485-
std::unique_ptr<DelayedTask> task {
486-
static_cast<DelayedTask*>(handle->data) };
487-
task->platform_data->DecreaseHandleCount();
488-
});
489-
});
492+
scheduled_delayed_tasks_.emplace_back(
493+
delayed.release(), [](DelayedTask* delayed) {
494+
uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
495+
[](uv_handle_t* handle) {
496+
std::unique_ptr<DelayedTask> task{
497+
static_cast<DelayedTask*>(handle->data)};
498+
task->platform_data->DecreaseHandleCount();
499+
});
500+
});
501+
}
502+
503+
std::queue<std::unique_ptr<Task>> tasks;
504+
{
505+
auto locked = foreground_tasks_.Lock();
506+
tasks = locked.PopAll();
490507
}
491-
// Move all foreground tasks into a separate queue and flush that queue.
492-
// This way tasks that are posted while flushing the queue will be run on the
493-
// next call of FlushForegroundTasksInternal.
494-
std::queue<std::unique_ptr<Task>> tasks = foreground_tasks_.PopAll();
508+
495509
while (!tasks.empty()) {
496510
std::unique_ptr<Task> task = std::move(tasks.front());
497511
tasks.pop();
498512
did_work = true;
499513
RunForegroundTask(std::move(task));
500514
}
515+
501516
return did_work;
502517
}
503518

@@ -587,66 +602,63 @@ TaskQueue<T>::TaskQueue()
587602
outstanding_tasks_(0), stopped_(false), task_queue_() { }
588603

589604
template <class T>
590-
void TaskQueue<T>::Push(std::unique_ptr<T> task) {
591-
Mutex::ScopedLock scoped_lock(lock_);
592-
outstanding_tasks_++;
593-
task_queue_.push(std::move(task));
594-
tasks_available_.Signal(scoped_lock);
605+
TaskQueue<T>::Locked::Locked(TaskQueue* queue)
606+
: queue_(queue), lock_(queue->lock_) {}
607+
608+
template <class T>
609+
void TaskQueue<T>::Locked::Push(std::unique_ptr<T> task) {
610+
queue_->outstanding_tasks_++;
611+
queue_->task_queue_.push(std::move(task));
612+
queue_->tasks_available_.Signal(lock_);
595613
}
596614

597615
template <class T>
598-
std::unique_ptr<T> TaskQueue<T>::Pop() {
599-
Mutex::ScopedLock scoped_lock(lock_);
600-
if (task_queue_.empty()) {
616+
std::unique_ptr<T> TaskQueue<T>::Locked::Pop() {
617+
if (queue_->task_queue_.empty()) {
601618
return std::unique_ptr<T>(nullptr);
602619
}
603-
std::unique_ptr<T> result = std::move(task_queue_.front());
604-
task_queue_.pop();
620+
std::unique_ptr<T> result = std::move(queue_->task_queue_.front());
621+
queue_->task_queue_.pop();
605622
return result;
606623
}
607624

608625
template <class T>
609-
std::unique_ptr<T> TaskQueue<T>::BlockingPop() {
610-
Mutex::ScopedLock scoped_lock(lock_);
611-
while (task_queue_.empty() && !stopped_) {
612-
tasks_available_.Wait(scoped_lock);
626+
std::unique_ptr<T> TaskQueue<T>::Locked::BlockingPop() {
627+
while (queue_->task_queue_.empty() && !queue_->stopped_) {
628+
queue_->tasks_available_.Wait(lock_);
613629
}
614-
if (stopped_) {
630+
if (queue_->stopped_) {
615631
return std::unique_ptr<T>(nullptr);
616632
}
617-
std::unique_ptr<T> result = std::move(task_queue_.front());
618-
task_queue_.pop();
633+
std::unique_ptr<T> result = std::move(queue_->task_queue_.front());
634+
queue_->task_queue_.pop();
619635
return result;
620636
}
621637

622638
template <class T>
623-
void TaskQueue<T>::NotifyOfCompletion() {
624-
Mutex::ScopedLock scoped_lock(lock_);
625-
if (--outstanding_tasks_ == 0) {
626-
tasks_drained_.Broadcast(scoped_lock);
639+
void TaskQueue<T>::Locked::NotifyOfCompletion() {
640+
if (--queue_->outstanding_tasks_ == 0) {
641+
queue_->tasks_drained_.Broadcast(lock_);
627642
}
628643
}
629644

630645
template <class T>
631-
void TaskQueue<T>::BlockingDrain() {
632-
Mutex::ScopedLock scoped_lock(lock_);
633-
while (outstanding_tasks_ > 0) {
634-
tasks_drained_.Wait(scoped_lock);
646+
void TaskQueue<T>::Locked::BlockingDrain() {
647+
while (queue_->outstanding_tasks_ > 0) {
648+
queue_->tasks_drained_.Wait(lock_);
635649
}
636650
}
637651

638652
template <class T>
639-
void TaskQueue<T>::Stop() {
640-
Mutex::ScopedLock scoped_lock(lock_);
641-
stopped_ = true;
642-
tasks_available_.Broadcast(scoped_lock);
653+
void TaskQueue<T>::Locked::Stop() {
654+
queue_->stopped_ = true;
655+
queue_->tasks_available_.Broadcast(lock_);
643656
}
644657

645658
template <class T>
646-
std::queue<std::unique_ptr<T>> TaskQueue<T>::PopAll() {
647-
Mutex::ScopedLock scoped_lock(lock_);
659+
std::queue<std::unique_ptr<T>> TaskQueue<T>::Locked::PopAll() {
648660
std::queue<std::unique_ptr<T>> result;
649-
result.swap(task_queue_);
661+
result.swap(queue_->task_queue_);
650662
return result;
651663
}
652664

src/node_platform.h

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,28 @@ class PerIsolatePlatformData;
2222
template <class T>
2323
class TaskQueue {
2424
public:
25+
class Locked {
26+
public:
27+
void Push(std::unique_ptr<T> task);
28+
std::unique_ptr<T> Pop();
29+
std::unique_ptr<T> BlockingPop();
30+
void NotifyOfCompletion();
31+
void BlockingDrain();
32+
void Stop();
33+
std::queue<std::unique_ptr<T>> PopAll();
34+
35+
private:
36+
friend class TaskQueue;
37+
explicit Locked(TaskQueue* queue);
38+
39+
TaskQueue* queue_;
40+
Mutex::ScopedLock lock_;
41+
};
42+
2543
TaskQueue();
2644
~TaskQueue() = default;
2745

28-
void Push(std::unique_ptr<T> task);
29-
std::unique_ptr<T> Pop();
30-
std::unique_ptr<T> BlockingPop();
31-
std::queue<std::unique_ptr<T>> PopAll();
32-
void NotifyOfCompletion();
33-
void BlockingDrain();
34-
void Stop();
46+
Locked Lock() { return Locked(this); }
3547

3648
private:
3749
Mutex lock_;
@@ -90,6 +102,8 @@ class PerIsolatePlatformData :
90102
void RunForegroundTask(std::unique_ptr<v8::Task> task);
91103
static void RunForegroundTask(uv_timer_t* timer);
92104

105+
uv_async_t* flush_tasks_ = nullptr;
106+
93107
struct ShutdownCallback {
94108
void (*cb)(void*);
95109
void* data;
@@ -102,7 +116,9 @@ class PerIsolatePlatformData :
102116

103117
v8::Isolate* const isolate_;
104118
uv_loop_t* const loop_;
105-
uv_async_t* flush_tasks_ = nullptr;
119+
120+
// When acquiring locks for both task queues, lock foreground_tasks_
121+
// first then foreground_delayed_tasks_ to avoid deadlocks.
106122
TaskQueue<v8::Task> foreground_tasks_;
107123
TaskQueue<DelayedTask> foreground_delayed_tasks_;
108124

0 commit comments

Comments
 (0)