准备
翻开 vm/thread_pool.h 与 vm/thread_pool.cc 两个文件。
开端
众所周知如需求在 Dart 开发中要运用「多线程」的能力那肯定离不开 Isolate
,与传统多线程的概念不同 Dart 语言下的 Isolate
之间进行数据同步与通讯时需求凭借「消息机制」。Isolate
在规划理念上淡化了线程的概念却更接近进程,在运用上与进程间通讯相似但又比进程间通讯更简略。实际上 Isolate
在 Runtime 底层完成仍然还是运用「多线程」的能力,它经过包装与笼统多线程让 Dart 代码能够直接运用 Isolate
来代替多线程,其间「线程池(以下用 ThreadPool 代替)」是这层笼统的根底。根据官方的界说:
在运用 Isolate
时能确保恣意线程不会一起进入多个 Isolate
且同一个 Isolate
不会被多个线程一起运转
Dart Runtime 中的 ThreadPool
不仅用来承载线程(Worker
)还承载了使命(Task
),Worker
代表「顾客」来消费当时的 Task
(引证 MessageHandler
),这两个类型是 ThreadPool
的内置类型本文后面会具体介绍。
假如你看过本专栏第一篇内容应该对此有大致印象,假如没有也没有联系,本文内容相对独不影响对
ThreadPool
的了解。
看源码之前切换到天主视角来看 ThreadPool
的根底效果:
-
C++
Isolate
类型与 Dart 中的Isolate
类型对应 -
每个 C++
Isolate
相关了一个MessageHandler
目标,MessageHandler
保存了当时Isolate
的一切Message
-
Message
包括 Dart 代码传进来的根底数据与port_id
,Message
可来自于当时Isolate
或另一个Isolate
,Message
被消费时会将数据再回调给 Dart 代码(另一个 Dart Isolate 回调) -
Isolate
担任触发ThreadPool
对Task
(Task
会引证MessageHandler
)的创立流程,创立好的Task
会保存到ThreadPool
自身的行列中等候自身Worker
来消费 -
每个
Worker
会确保消费掉一个Task
内的一切Message
后再进入另一个Task
-
多个
Isolate
共享同一个ThreadPool
锁
本末节为 C++ 根底,C++ 大佬可略过
在源码中总是能看到相似下面的代码,函数效果域内又界说了一个内部效果域,从语法层面上看这个内部效果域好像没有什么意义,有没有它也不影响代码的履行逻辑。那它有什么效果呢?不卖关子,这儿其实是 C++ 多线程下「锁」的巧妙用法。
bool ThreadPool::RunImpl(std::unique_ptr<Task> task) {
Worker* new_worker = nullptr;
{ // 声明内部效果域
MonitorLocker ml(&pool_monitor_); // 界说变量
if (shutting_down_) {
return false;
}
new_worker = ScheduleTaskLocked(&ml, std::move(task));
} // 效果域完毕
if (new_worker != nullptr) {
new_worker->StartThread();
}
return true;
}
一般情况下咱们在多线程下运用锁时都是直接在临界资源访问前后直接调用加解锁的代码,伪代码示例如下:
int globalCount = 1;
void threadEntry() {
// 直接加锁
thread_lock();
globalCount++;
// 直接解锁
thread_unlock();
}
这种写法很直观,访问临界资源前加锁占用当时资源,避免其它线程再访问当时资源,访问完毕后再解锁开释当时资源。留意这儿的加解锁必定得是一个对称操作,有加锁的代码就必定要对称呈现解锁代码不然问题很严重。但假如要保护的临界资源较长(加解锁保护的代码较长)或许相同的加解锁代码太多又或许有提前 return
边界条件,如何避免解锁代码漏写就比较麻烦了。
C++ 类界说时有结构函数与析构函数,当 new
出一个目标时结构函数会被调用,delete
开释目标时析构函数会被调用,所以当类的暂时变量超过效果域它的析构函数会被调用。正是运用 C++ 的这个特色能够经过增加效果域的办法,来保护临界资源。来一个简略示例:
#include <iostream>
#include <mutex>
// 简化版的互斥锁类
class MyMutex {
public:
MyMutex(std::mutex& mtx) : mutexRef(mtx) {
mutexRef.lock();
}
~MyMutex() {
mutexRef.unlock();
}
private:
std::mutex& mutexRef;
};
int globalCount = 1;
int main() {
// 一个互斥锁
std::mutex _mutex;
// 在效果域中创立 MyMutex 目标,结构函数加锁,离开效果域时析构函数解锁
{
MyMutex myMutex(_mutex);
globalCount++;
}
// MyMutex 目标离开效果域后,互斥锁已被解锁
return 0;
}
回到 ThreadPool
中的锁类型,ThreadPool
中有触及到了 Runtime 中两种锁类型 Monitor
与 MonitorLocker
。从 Monitor
的结构函数与析构函数能够看出,Monitor
正是运用了析构特性完成了超出效果域自动解锁的能力,是对条件锁(也称条件变量)的一层包装。一起 Monitor
也用来屏蔽不同 OS 的锁完成,下面的代码正是在 MacOS/iOS 完成(os_thread_macos.cc)。
条件变量扩展阅读:pthread 条件变量
// 结构函数
Monitor::Monitor() {
pthread_mutexattr_t attr;
int result = pthread_mutexattr_init(&attr);
result = pthread_mutex_init(data_.mutex(), &attr);
result = pthread_mutexattr_destroy(&attr);
result = pthread_cond_init(data_.cond(), nullptr);
}
// 析构函数
Monitor::~Monitor() {
int result = pthread_mutex_destroy(data_.mutex());
result = pthread_cond_destroy(data_.cond());
}
而 MonitorLocker
则更为直接,从类界说来看它仅仅是对 Monitor
进行了二次包装。知识点:在 Dart Runtime 中并不直接运用 Monitor
进行锁操作,而是运用 Monitor
的封装类 MonitorLocker
进行锁操作。
class MonitorLocker {
public:
explicit MonitorLocker(Monitor* monitor) : monitor_(monitor) {
monitor_->Enter();
}
virtual ~MonitorLocker() { monitor_->Exit(); }
Monitor::WaitResult Wait(int64_t millis = Monitor::kNoTimeout) {
return monitor_->Wait(millis);
}
void Notify() { monitor_->Notify(); }
void NotifyAll() { monitor_->NotifyAll(); }
private:
// 对不同操作系统条件变量的封装
Monitor* const monitor_;
}
上面临 ThreadPool 中触及到的锁进行了介绍,信任看到相似的代码后不会再感到困惑。
两个类型
本文最初提到了线程池模型中的生产者(Task
)与顾客(Worker
),整个 ThreadPool
都是环绕这两个类型的行列进行逻辑处理,本末节将重点介绍这两个类型。
Task
Task
被界说在 ThreadPool
中,是 ThreadPool
的内置类型,一起经过查找整个 Runtime 库房可知 Task
也是一个基类,它派生出了不同的子类,每个子类都代表某种使命。
基类 Task
内只要一个函数 Run
, 一起它的结构函数被 protected
润饰,阐明它及它的子类都只能在 ThreadPool
及 ThreadPool
子类中实例化。实际上在 Task
类界说的下方就有 Task
的实例化逻辑,只不过它是用模板(C++ 中的模板相当于泛型)完成。
class ThreadPool {
public:
// 基类 Task 的界说,继承自 IntrusiveDListEntry 阐明它的子类能够进行列队操作
class Task : public IntrusiveDListEntry<Task> {
protected:
Task() {}
public:
virtual ~Task() {}
// 虚函数,由子担任类完成
virtual void Run() = 0;
};
// 模板(泛型)函数,担任实例化 Task 子类
template <typename T, typename... Args>
bool Run(Args&&... args) {
return RunImpl(std::unique_ptr<Task>(new T(std::forward<Args>(args)...)));
}
private:
using TaskList = IntrusiveDList<Task>;
TaskList tasks_;
}
Task
子类实例化后便调用了 ThreadPool::RunImpl
办法,MessageHandler
正是由此触发了 Task
的创立流程。Task
子类很多,这儿咱们暂时只重视 MessageHandler
相关的部分,也便是 MessageHandlerTask
,能够看看它的完成。
Worker
Worker
字面意思有「东西人」的味道,从类界说来看它持有当时线程(os_thread_
成员变量),且有一个 Main
静态函数,阐明这个 Main
是线程的进口函数。
class ThreadPool {
public:
private:
class Worker : public IntrusiveDListEntry<Worker> {
public:
explicit Worker(ThreadPool* pool);
void StartThread();
private:
friend class ThreadPool;
// 线程创立后的进口函数
static void Main(uword args);
ThreadPool* pool_;
ThreadJoinId join_id_;
// 持有当时函数
OSThread* os_thread_ = nullptr;
};
using WorkerList = IntrusiveDList<Worker>;
// 不同状况的 Worker 行列
WorkerList running_workers_;
WorkerList idle_workers_;
WorkerList dead_workers_;
}
经过 Worker
的完成可知, Main
函数在 ThreadPool::Worker::StartThread
办法中被传入操作系统线程开端履行。
void ThreadPool::Worker::StartThread() {
int result = OSThread::Start("DartWorker", &Worker::Main,
reinterpret_cast<uword>(this));
// 省掉 result 判别
}
仍然以 MacOS/iOS 系统渠道代码为例,线程创立的代码如下所示(对源码略有简化)。从源码来看,线程的创立并没有任何特别处理。
int OSThread::Start(const char* name,
ThreadStartFunction function,
uword parameter) {
// ... 省掉其它代码
// ThreadPool::Worker::Main 函数与参数保存在 data 目标中
ThreadStartData* data = new ThreadStartData(name, function, parameter);
pthread_t tid;
// 创立线程
result = pthread_create(&tid, &attr, ThreadStart, data);
// ... 省掉其它代码
return 0;
}
// Worker 优先级全局常量,默许值为:kMinInt
int FLAG_worker_thread_priority = Flags::Register_int(&FLAG_worker_thread_priority, "worker_thread_priority", kMinInt, "The thread priority the VM should use for new worker threads.");
static void* ThreadStart(void* data_ptr) {
// 假如优先级不为 kMinInt 时则设置线程优选级
if (FLAG_worker_thread_priority != kMinInt) {
// 这儿的 FLAG_worker_thread_priority 全局变量默许值为 kMinInt
// 所以优选级不会永远不会被设置
const pthread_t thread = pthread_self();
int policy = SCHED_FIFO;
struct sched_param schedule;
pthread_getschedparam(thread, &policy, &schedule);
schedule.sched_priority = FLAG_worker_thread_priority;
pthread_setschedparam(thread, policy, &schedule);
}
// 取出 ThreadPool::Worker 内的静态 Main 函数与参数
OSThread::ThreadStartFunction function = data->function();
uword parameter = data->parameter();
// 调用 ThreadPool::Worker::Main 函数
function(parameter);
return nullptr;
}
一切线程创立后均运用默许优先级(pthread_create
创立的线程默许优先级为 0),阐明 Dart 还没有针对 Apple M 系列的芯片做针对性的功能优化,这可能会使 Dart 在核算密集型的场景处于晦气方位。因为根据少数派这篇文章所述,Apple M 系列芯片运用大小核架构(大核:功能中心简称 P 核,小核:效能中心简称 E 核),优先级低的线程只会分配到 E 中心上,只要当 E 中心分配满了才会分配 P 中心。
扩展阅读:M1 CPU 那么多的核,macOS 是怎样办理的?。虽然这篇文章所述的优先级均是 QoS (NSOperation)优先级,但根据苹果的 Prioritize Work with Quality of Service Classes 文档与 XNU 源码 可知 QoS 与 pthread 优先级存在映身联系。
假如你的 Dart 应用(包括 Flutter 桌面 App,甚至 Dart 编译前端)对功能有更高要求,理论上能够测验更改 FLAG_worker_thread_priority
的默许值(如:63)然后从头编译 Dart SDK,让 MacOS 操作系统强制优先分配 P 中心来提升功能。(因为我这边没有 M 芯片 Mac 无法做验证,假如你做了相关验证请必定要让我知道 )
相似思路:假如需求优化 Flutter App 的发动功能也能够更改这个优先级变量,参阅根据:不改一行事务代码,飞书 iOS 低端机发动优化实践
中心
上面介绍完了 ThreadPool
的根底知识,假如有 C/C++ 根底知识其实就能完全看懂这部分代码了,这儿只对对一些中心细节进行具体阐明。
进口
如前所述线程池是生产者与顾客模型,生产者经过 ThreadPool::Run
函数触发 Task
派生子类型的创立,然后会调用到 ThreadPool::RunImpl
函数。这儿 ThreadPool::RunImpl
便是线程池的真正「进口」。
// ThreadPool 进口
template <typename T, typename... Args>
bool Run(Args&&... args) {
return RunImpl(std::unique_ptr<Task>(new T(std::forward<Args>(args)...)));
}
bool ThreadPool::RunImpl(std::unique_ptr<Task> task) {
Worker* new_worker = nullptr;
{
MonitorLocker ml(&pool_monitor_);
if (shutting_down_) {
return false;
}
// Task 与(潜在的)Worker 创立
new_worker = ScheduleTaskLocked(&ml, std::move(task));
}
if (new_worker != nullptr) {
// 假如创立了闲暇 Worker 就发动它
new_worker->StartThread();
}
return true;
}
// ThreadPool 进口运用办法以 MessageHandler 中对线程池的运用做为示例
bool MessageHandler::Run(ThreadPool* pool,
StartCallback start_callback,
EndCallback end_callback,
CallbackData data) {
// ...省掉
// message_handle.cc 中对线程池进口的调用,this 参数是当时 MessageHandler 目标
// MessageHandlerTask 泛型指定生成的 Task 类型为 MessageHandlerTask
pool_->Run<MessageHandlerTask>(this);
// ...省掉
}
发动
ThreadPool::RunImpl
办法内部经过调用 ThreadPool::ScheduleTaskLocked
办法来担任 Worker
与 Task
的创立。Task
直接创立且只要一个显式的状况: Pending(经过 pending_tasks_
变量保护),一个 Task
不是在 Pending 状况便是在运转状况。而 Worker
创立的进程中会判别当时存活的线程数量(也便是 Worker
行列数量)是否超过最大约束(max_pool_size_
),假如超过约束则测验唤醒闲暇线程(Worker
)。
Worker 的唤醒机制是经过向条件变量发送告诉来唤醒经过 WaitMicros 办法进入阻塞状况的线程。关于「条件变量」运用可查阅相关文章进行了解,例如这篇。
ThreadPool::Worker* ThreadPool::ScheduleTaskLocked(MonitorLocker* ml,
std::unique_ptr<Task> task) {
// 将 task 增加到行列并记录待运转 task 数量
tasks_.Append(task.release());
pending_tasks_++;
// 假如闲暇线程大于等于 pending 状况 task 数量,则优先唤醒闲暇线程
if (count_idle_ >= pending_tasks_) {
ml->Notify();
return nullptr;
}
// 正在运转与闲暇的线程数超过最大线程数约束,则优先唤醒闲暇线程
if (max_pool_size_ > 0 && (count_idle_ + count_running_) >= max_pool_size_) {
if (!idle_workers_.IsEmpty()) {
ml->Notify();
}
return nullptr;
}
// 不然直接创立闲暇的 Worker 并返回
auto new_worker = new Worker(this);
idle_workers_.Append(new_worker);
count_idle_++;
return new_worker;
}
正如你所想,Worker
除了有闲暇状况(Idle)、还有运转状况(Running)、逝世状况(Dead),三者之间的转换联系如下:
ThreadPool
中与之对应的状况改变办法分别是:
办法 | 效果 |
---|---|
ThreadPool::IdleToRunningLocked | 闲暇转运转 |
ThreadPool::RunningToIdleLocked | 运转转闲暇 |
ThreadPool::IdleToDeadLocked | 闲暇转逝世 |
状况改变操作仅仅只是将 Worker
在不同的行列中移动并改变状况计数,以 ThreadPool::IdleToRunningLocked
办法为例:
// Worker 从闲暇转移到运转状况
void ThreadPool::IdleToRunningLocked(Worker* worker) {
// 从闲暇行列中移除
idle_workers_.Remove(worker);
// 增加到运转状况行列
running_workers_.Append(worker);
// 保护状况变量
count_idle_--;
count_running_++;
}
Worker
创立出来后默许是 Idle 状况,ThreadPool::ScheduleTaskLocked
办法内被创立出来后立即履行了 ThreadPool::StartThread
办法来发动 Worker
(即开启新线程调用 ThreadPool::Worker::Main
办法),发动后其状况会变成 Running 状况。发动的具体进程如上节「Worker 类型」介绍所述。
// 在新线程内运转 Worker::Main 办法,this 参数为当时 ThreadPool 目标,最终会传入 Main 办法内
void ThreadPool::Worker::StartThread() {
int result = OSThread::Start("DartWorker", &Worker::Main,
reinterpret_cast<uword>(this));
}
这儿需求重视的是 ThreadPool::Worker::Main
函数内部完成,并且 ThreadPool::Worker::Main
函数的履行是在新的线程,新的线程,新的线程内,已与「进口」函数所在线程不同。它的中心是经过 ThreadPool::WorkerLoop
在新线程内消费当时 ThreadPool
内保存的 Pending 状况的 Task
行列。
// 源码略有简化
void ThreadPool::Worker::Main(uword args) {
// 获取当时 OSThread 目标(OSThread 是 Runtime 对渠道线程的笼统,保存在当时线程的 TLS)
OSThread* os_thread = OSThread::Current();
// 将传过来的参数转换为 ThreadPool 目标
Worker* worker = reinterpret_cast<Worker*>(args);
ThreadPool* pool = worker->pool_;
// 将 Worker 与 OSThread 彼此相关
os_thread->owning_thread_pool_worker_ = worker;
worker->os_thread_ = os_thread;
// 保存 join_id 用于资源整理
worker->join_id_ = OSThread::GetCurrentThreadJoinId(os_thread);
// 开端消费 Task 循环
pool->WorkerLoop(worker);
// 退出循环整理绑定联系
worker->os_thread_ = nullptr;
os_thread->owning_thread_pool_worker_ = nullptr;
}
整个 ThreadPool
内中心中的中心便是 ThreadPool::WorkerLoop
,它担任来消费处于 Pending 状况的 Task
。整个办法主体只要一个 while
循环,留意循环效果域最初的 MonitorLocker
,它是一个条件变量互斥锁(详情参阅上面 锁 末节的介绍),效果域内加锁,离开效果域后解锁。MonitorLocker
变量的存在确保了多个线程不会进入一起进入同一个 Task
,即多个线程不会一起进入同一个 Isolate
。
下面这段代码看起来长,但信任我它并不杂乱请必定耐性看完。
void ThreadPool::WorkerLoop(Worker* worker) {
// 在当时线程中搜集 dead 状况的 worker
WorkerList dead_workers_to_join;
while (true) {
// 声明线程锁并加锁,该锁在变量离开效果域后解锁
MonitorLocker ml(&pool_monitor_);
// Pending 使命行列不为空,进入内部循环
if (!tasks_.IsEmpty()) {
// 将当时 worker 的状况转移至 Running
IdleToRunningLocked(worker);
// 消费 task_ 直到列表为空
while (!tasks_.IsEmpty()) {
// 将 task_ 从行列中取出
std::unique_ptr<Task> task(tasks_.RemoveFirst());
// 减少 Pending task 数量
pending_tasks_--;
// 对上面声名的锁暂时解锁,答应其它线程能够继续消费其它 task_
MonitorLeaveScope mls(&ml);
// 运转 task_,消费内部的 Message
task->Run();
ASSERT(Isolate::Current() == nullptr);
// task 指针置空
task.reset();
}
// task_ 行列为空后将当时 worker 的状况转移至 Idle
RunningToIdleLocked(worker);
}
// 一切线程都闲暇时整个线程池进入闲暇状况
if (running_workers_.IsEmpty()) {
OnEnterIdleLocked(&ml);
if (!tasks_.IsEmpty()) {
continue;
}
}
// 假如线程池封闭则将当时 worker 转移到 Dead 状况
if (shutting_down_) {
// 搜集之前其它线程现已 Dead 的 worker
ObtainDeadWorkersLocked(&dead_workers_to_join);
IdleToDeadLocked(worker);
break;
}
// 下面的代码中心逻辑是将当时线程挂起,在挂起的时间内等候被唤醒
// 因为挂起前 worker 已进入 Idle 状况,假如等候超时,则当时线程进会进入 Dead 状况
const int64_t idle_start = OS::GetCurrentMonotonicMicros();
bool done = false;
while (!done) {
// 线程默许挂起时长为 5 秒
const auto result = ml.WaitMicros(ComputeTimeout(idle_start));
if (!tasks_.IsEmpty()) break;
if (shutting_down_ || result == Monitor::kTimedOut) {
done = true;
break;
}
}
// 假如超时或封闭
if (done) {
// 搜集之前其它线程现已 Dead 的 worker
ObtainDeadWorkersLocked(&dead_workers_to_join);
// 将当时 worker 置于 Dead 状况
IdleToDeadLocked(worker);
break;
}
}
// 如搜集到的 Dead 状况 Worker 不为空,则等候它们完毕后再完毕当时线程
JoinDeadWorkersLocked(&dead_workers_to_join);
}
留意 while (!tasks_.IsEmpty())
循环的存在,它标明当时线程会消费 tasks_
行列中一切 Task
,一起经过 MonitorLeaveScope
将线程锁暂时解锁,也给其它线程来消费 tasks_
行列的机会。
所谓消费 Task
便是履行其 Run
办法,Run
办法处理完一切 Message
才完毕
在消费 Task
前后会改变 Worker
的状况(Running/Idle),Worker
进入 Idle
状况后马上会被挂起,直到超时或被唤醒。超时后会进入 Dead 状况,唤醒则变成 Running 状况然后继续消费 Task
。正是由这套状况机制的确保了一个线程(Worker
)不会一起进入两个 Isolate
。
One More Thing
还有几个值得留意的细节是整个 Runtime 内并不是只要一个线程池实例,实际上 ThreadPool
还有一个派生类型 MutatorThreadPool
,所以整个 Dart Runtime 只要两个线程池实例。MutatorThreadPool
类型的实例用来运转 Dart 代码,而 ThreadPool
类型的线程池用来做内存的 GC 操作或编译等辅助作业。并且在线程池数量约束上也有所不同,MutatorThreadPool
类型线程池默许最大线程数量是 8,而 ThreadPool
类型对线程数量没有约束。
另外线程挂起的默许超时时长是 5 秒,只要线程在 5 秒内被唤醒它仍然会苟活于世。至于为什么是 5 秒不是 10 秒,我也不知道,假如你知道勿必告诉我
ThreadPool
相关的知识点不杂乱,了解起来不会有太多阻止。中心思想仍然是传统的生产者与顾客形式,再加上针对不同渠道的线程笼统结合生命周期界说组成了整程线池的中心逻辑。