开源项目Workflow中有一个十分重要的根底模块:代码仅300行的C言语线程池。
逻辑齐备的三个特色在第3部分开始解说,欢迎跳阅,或直接到Github主页上围观代码。
github.com/sogou/workf…
0 – Workflow的thrdpool
Workflow的大招:计算通信融为一体的异步调度模式,而计算的中心:Executor调度器,便是基于这个线程池完成的。能够说,一个通用而高效的线程池,是咱们写C/C++代码时离不开的根底模块。
thrdpool代码方位在src/kernel/,不只能够直接拿来运用,一起也适合阅览学习。
而更重要的,秉承Workflow项目本身一贯的谨慎极简的作风,这个thrdpool代码极致简洁,完成逻辑上亦十分齐备,结构精巧,处处谨慎,不得不让我惊叹:
妙啊!!!
你可能会很好奇,线程池还能写出什么别致的新思路吗?先列出一些,你们细品:
- 特色1:创立完线程池后,无需记载任何线程id或目标,线程池能够经过一个等一个的方法优雅地去完毕一切线程;
- 特色2:线程使命能够由另一个线程使命调起;甚至线程池正在被毁掉时也能够提交下一个使命;(这很重要,因为线程本身很可能是不知道线程池的状况的;
- 特色3:同理,线程使命也能够毁掉这个线程池;(十分完整~
我真的迫不及待为咱们深层解读一下,这个我愿称之为“逻辑齐备”的线程池。
1 – 前置常识
第一部分我先从最基本的内容梳理一些个人了解,有根底的小伙伴能够直接越过。假如有不精确的地方,欢迎咱们指正交流~
为什么需求线程池?(其实思路不只对线程池,对任何有限资源的调度办理都是相似的)
咱们知道,经过体系提供的pthread或者std::thread创立线程,就能够完成多线程并发履行咱们的代码。
可是CPU的核数是固定的,所以真正并发履行的最大值也是固定的,过多的线程创立除了频繁发生创立的overhead以外,还会导致对体系资源进行争抢,这些都是不必要的糟蹋。
因而咱们能够办理有限个线程,循环且合理地使用它们。♻️
那么线程池一般包含哪些内容呢?
- 首先是办理若干个~~~工具人~~~线程;
- 其次是办理交给线程去履行的使命,这个一般会有一个行列;
- 再然后线程之间需求一些同步机制,比方mutex、condition等;
- 终究便是各线程池完成上自身需求的其他内容了;
好了,接下来咱们看看Workflow的thrdpool是怎样做的。
2 – 代码概览
以下共7步常用思路,足以让咱们把代码飞快过一遍。
第1步:先看头文件,模块提供什么接口。
咱们翻开thrdpool.h
,能够只关注三个接口:
// 创立线程池
thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize);
// 把使命交给线程池的进口
int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool);
// 毁掉线程池
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
thrdpool_t *pool);
第2步:接口上有什么数据结构。
也便是,咱们怎么描绘一个交给线程池的使命。
struct thrdpool_task
{
void (*routine)(void *); // 一个函数指针
void *context; // 一个上下文
};
第3步:再看完成.c,有什么内部数据结构。
struct __thrdpool
{
struct list_head task_queue; // 使命行列
size_t nthreads; // 线程个数
size_t stacksize; // 结构线程时的参数
pthread_t tid; // 运行起来之后,pool上记载的这个是zero值
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_key_t key;
pthread_cond_t *terminate;
};
没有一个剩余,每一个成员都很到位:
- tid:线程id,整个线程池只要一个,它不会古怪地去记载任何一个线程的id,这样就不完美了,它平时运行的时分是空值,退出的时分,它是用来完成链式等候的要害。
- mutex 和 cond是常见的线程间同步的工具,其中这个cond是用来给生产者和顾客去操作使命行列用的。
- key:是线程池的key,然后会赋予给每个由线程池创立的线程作为他们的thread local,用于区别这个线程是否是线程池创立的。
- 咱们还看到一个**pthread_cond_t terminate*,这有两个用处:不只是退出时的标记位 ,并且仍是调用退出的那个人要等候的condition。
以上各个成员的用处,好像说了,又好像没说,是因为几乎每一个成员都值得深挖一下,所以咱们记住它们,后边看代码的时分就会恍然大悟!
第4步:接口都调用了什么中心函数。
thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize)
{
thrdpool_t *pool;
ret = pthread_key_create(&pool->key, NULL);
if (ret == 0)
{
... // 去掉了其他代码,可是留意到方才的tid和terminate的赋值
memset(&pool->tid, 0, sizeof (pthread_t));
pool->terminate = NULL;
if (__thrdpool_create_threads(nthreads, pool) >= 0)
return pool;
...
这儿能够看到__thrdpool_create_threads()
里面最要害的便是循环创立nthreads个线程。
while (pool->nthreads < nthreads)
{
ret = pthread_create(&tid, &attr, __thrdpool_routine, pool);
...
第5步:略读中心函数的功能。
所以咱们在上一步知道了,每个线程履行的是__thrdpool_routine()
。不难想象,它会不断从行列拿使命出来履行:
static void *__thrdpool_routine(void *arg)
{
...
while (1)
{
// 1. 从行列里拿一个使命出来,没有就等候
pthread_mutex_lock(&pool->mutex);
while (!pool->terminate && list_empty(&pool->task_queue))
pthread_cond_wait(&pool->cond, &pool->mutex);
if (pool->terminate) // 2. 线程池完毕的标志位,记住它,先越过
break;
// 3. 假如能走到这儿,祝贺你,拿到了使命~
entry = list_entry(*pos, struct __thrdpool_task_entry, list);
list_del(*pos);
pthread_mutex_unlock(&pool->mutex); // 4. 先解锁
task_routine = entry->task.routine;
task_context = entry->task.context;
free(entry);
task_routine(task_context); // 5. 再履行
// 6. 这儿也先记住它,意思是线程池里的线程能够毁掉线程池
if (pool->nthreads == 0)
{
/* Thread pool was destroyed by the task. */
free(pool);
return NULL;
}
}
... // 后边还有魔法,留下一章解读~~~
第6步:把函数之间的联系联系起来。
方才看到的__thrdpool_routine()
便是线程的中心函数了,它能够和谁相关起来呢?
能够和接口thrdpool_schedule()
相关上。
咱们说过,线程池上有个行列办理使命,
- 所以,每个履行routine的线程,都是顾客;
- 而每个主张schedule的线程,都是生产者;
咱们现已看过顾客了,来看看生产者的代码:
inline void __thrdpool_schedule(const struct thrdpool_task *task, void *buf,
thrdpool_t *pool)
{
struct __thrdpool_task_entry *entry = (struct __thrdpool_task_entry *)buf;
entry->task = *task;
pthread_mutex_lock(&pool->mutex);
list_add_tail(&entry->list, &pool->task_queue); // 添加到行列里
pthread_cond_signal(&pool->cond); // 叫醒在等候的线程
pthread_mutex_unlock(&pool->mutex);
}
提到这儿,特色2
就十分清晰了:
开篇说的特色2
是说,”线程使命能够由另一个线程使命调起”。
只要对行列的办理做得好,显然咱们在顾客所履行的函数也能够做生产者。
第7步:看其他状况的处理,对于线程池来说便是比方毁掉的状况。
只看咱们接口thrdpool_destroy()的完成是十分简略的:
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
thrdpool_t *pool)
{
...
// 1. 内部会设置pool->terminate,并叫醒一切等在行列拿使命的线程
__thrdpool_terminate(in_pool, pool);
// 2. 把行列里还没有履行的使命都拿出来,经过pending返回给用户
list_for_each_safe(pos, tmp, &pool->task_queue)
{
entry = list_entry(pos, struct __thrdpool_task_entry, list);
list_del(pos);
if (pending)
pending(&entry->task);
... // 后边便是毁掉各种内存,相同有魔法~
在退出的时分,咱们那些现已提交可是还没有被履行的使命是绝对不能就这么扔掉了的,所以咱们能够传入一个pending()
函数,上层能够做自己的收回、回调、任何确保上层逻辑齐备的工作。
规划的完整性,无处不在。
接下来咱们就能够跟着咱们的中心问题,针对性地看看每个特色都是怎样完成的。
3 – 特色1: 一个等候一个的优雅退出
这儿提出一个问题:线程池要退出,怎么完毕一切线程?
一般线程池的完成都是需求记载下一切的线程id,或者thread目标,以便于咱们去jion等候它们完毕。
可是咱们方才看,pool里并没有记载一切的tid呀?正如开篇说的,pool上只要一个tid,并且仍是个空的值。
所以特色1
给出了Workflow的thrdpool的答案:
无需记载一切线程,我能够让线程挨个主动退出、且一个等候一个,终究到达我调用完thrdpool_destroy()后内存能够收回洁净的目的。
这儿先给一个简略的图,假设主张destroy的人是main线程,咱们怎么做到一个等一个退出:
最简略的:外部线程主张destroy
过程如下:
- 线程的退出,由thrdpool_destroy()设置pool->terminate开始。
- 咱们每个线程,在while(1)里会第一时间发现terminate,线程池要退出了,然后会break出这个while循环。
- 留意这个时分,还持有着mutex锁,咱们拿出pool上唯一的那个tid,放到我的暂时变量,我会根据拿出来的值做不同的处理。且我会把我自己的tid放上去,然后再解mutex锁。
- 那么很显然,第一个从pool上拿tid的人,会发现这是个0值,就能够直接完毕了,不必担任等候任何其他人,但我在彻底完毕之前需求有人担任等候我的完毕,所以我会把我的id放上去。
- 而假如发现自己从pool里拿到的tid不是0值,说明我要担任jion上一个人,并且把我的tid放上去,让下一个人担任我。
- 终究的那个人,是那个发现pool->nthreads为0的人,那么我就能够经过这个terminate(它本身是个condition)去告诉主张destroy的人。
- 终究主张者就能够退了。
是不是十分有意思!!!十分优雅的做法!!!
所以咱们会发现,其实咱们不太需求知道太多信息,只需求知道我要担任的上一个人。
当然每一步都是十分谨慎的,咱们结合方才越过的第一段魔法感受一下:
static void *__thrdpool_routine(void *arg)
{
while (1)
{
pthread_mutex_lock(&pool->mutex); // 1.留意这儿还持有锁
... // 等着行列拿使命出来
if (pool->terminate) // 2. 这既是标识位,也是主张毁掉的那个人所等候的condition
break;
... // 履行拿到的使命
}
/* One thread joins another. Don't need to keep all thread IDs. */
tid = pool->tid; // 3. 把线程池上记载的那个tid拿下来,我来担任上一人
pool->tid = pthread_self(); // 4. 把我自己记载到线程池上,下一个人来担任我
if (--pool->nthreads == 0) // 5. 每个人都减1,终究一个人担任叫醒主张detroy的人
pthread_cond_signal(pool->terminate);
pthread_mutex_unlock(&pool->mutex); // 6. 这儿能够解锁进行等候了
if (memcmp(&tid, &__zero_tid, sizeof (pthread_t)) != 0) // 7. 只要第一个人拿到0值
pthread_join(tid, NULL); // 8. 只要不0值,我就要担任等上一个完毕才能退
return NULL; // 9. 退出,干洁净净~
}
4 – 特色2:线程使命能够由另一个线程使命调起
在第二部分咱们看过源码,只要行列办理得好,线程使命里提交下一个使命是彻底OK的。
这很合理。
那么问题来了,特色1
又说,咱们每个线程,是不太需求知道太多线程池的状况和信息的。而线程池的毁掉是个过程,假如在这个过程间提交使命会怎样样呢?
因而特色2
的一个重要解读是:线程池被毁掉时也能够提交下一个使命。并且方才提过,还没有被履行的使命,能够经过咱们传入的pending()函数拿回来。
简略看看毁掉时的谨慎做法:
static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)
{
pthread_cond_t term = PTHREAD_COND_INITIALIZER;
pthread_mutex_lock(&pool->mutex); // 1. 加锁设置标识位
pool->terminate = &term; // 2. 之后的添加使命不会被履行,但能够pending拿到
pthread_cond_broadcast(&pool->cond); // 3. 播送一切等候的顾客
if (in_pool) // 4. 这儿的魔法等下讲>_<~
{
/* Thread pool destroyed in a pool thread is legal. */
pthread_detach(pthread_self());
pool->nthreads--;
}
while (pool->nthreads > 0) // 5. 假如还有线程没有退完,我会等,留意这儿是while
pthread_cond_wait(&term, &pool->mutex);
pthread_mutex_unlock(&pool->mutex);
if (memcmp(&pool->tid, &__zero_tid, sizeof (pthread_t)) != 0)
pthread_join(pool->tid, NULL); // 6.相同地等候计划退出的上一个人
}
5 – 特色3:相同能够在线程使命里毁掉这个线程池
既然线程使命能够做任何工作,理论上,线程使命也能够毁掉线程池❓
作为一个逻辑齐备的线程池,斗胆一点,咱们把问号去掉。
并且,毁掉并不会完毕当前使命,它会等这个使命履行完。
想象一下,方才的__thrdpool_routine()
,while里拿出来的那个使命,做的工作竟然是主张thrdpool_destroy()
…
咱们来把上面的图改一下:
斗胆点,咱们让一个routine来destroy线程池
假如主张毁掉的人,是咱们自己内部的线程,那么咱们就不是等n个,而是等n-1,少了一个外部线程等候咱们。怎么完成才能让这些逻辑都完美交融呢?咱们把方才越过的三段魔法串起来看看。
第一段魔法,毁掉的主张者。
假如发现主张毁掉的人是线程池内部的线程,那么它具有较强的自我办理意识(因为前面说了,会等它这个使命履行完),而咱们能够放心斗胆地pthread_detach,无需任何人jion它等候它完毕。
static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)
{
...
if (in_pool) // 每个由线程池创立的线程都设置了一个key,由此判别是否是in_pool
{
/* Thread pool destroyed in a pool thread is legal. */
pthread_detach(pthread_self());
pool->nthreads--;
}
第二段魔法:线程池谁来free?
一定是主张毁掉的那个人。所以这儿用in_pool来操控main线程的收回:
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
thrdpool_t *pool)
{
// 现已调用完第一段,且挨个pending(未履行的task)了
... // 毁掉其他内部分配的内存
if (!in_pool) // 假如不是内部线程主张的毁掉,要担任收回线程池内存
free(pool);
}
那现在不是main线程主张的毁掉呢?主张的毁掉的那个内部线程,怎样能确保我能够在终究关头把一切资源收回洁净、调free(pool)、功成身退呢?
在前面阅览源码第5步,其实咱们看过,__thrdpool_routine()里有free的地方。
所以现在三段魔法终于串起来了。
第三段魔法:谨慎的并发。
static void *__thrdpool_routine(void *arg)
{
while (1)
{
... // 前面履行完一个使命,假如使命里做的工作,是毁掉线程池...
// 留意这个时分,其他内存都现已被destroy的那个清掉了,万万不能够再用什么mutex、cond
if (pool->nthreads == 0)
{
/* Thread pool was destroyed by the task. */
free(pool);
return NULL;
}
...
十分重要的一点,因为并发,咱们是不知道谁先操作的。假设咱们稍微改一改这个顺序,就又是另一番逻辑。
比方我作为一个内部线程,在routine里调用destroy期间,发现还有线程没有履行完,我就要等在我的terminate上,待终究看到nthreads==0的那个人叫醒我。然后我的代码继续履行,函数栈就会从destroy回到routine,也便是上面那几行,然后,free(pool);,这时分我现已放飞自我detach了,能够顺畅完毕。
你看,无论怎么,都能够完美地毁掉线程池:
是不是太妙了!我写到这儿现已要感动哭了!
6 – 简略的用法
这个线程池只要两个文件: thrdpool.h
和 thrdpool.c
,并且只依赖内核的数据结构list.h
。咱们把它拿出来玩,自己写一段代码:
void my_routine(void *context) // 咱们要履行的函数
{
printf("task-%llu start.\n", reinterpret_cast<unsigned long long>(context); );
}
void my_pending(const struct thrdpool_task *task) // 线程池毁掉后,没履行的使命会到这儿
{
printf("pending task-%llu.\n", reinterpret_cast<unsigned long long>(task->context););
}
int main()
{
thrdpool_t *thrd_pool = thrdpool_create(3, 1024); // 创立
struct thrdpool_task task;
unsigned long long i;
for (i = 0; i < 5; i++)
{
task.routine = &my_routine;
task.context = reinterpret_cast<void *>(i);
thrdpool_schedule(&task, thrd_pool); // 调用
}
getchar(); // 卡住主线程,按回车继续
thrdpool_destroy(&my_pending, thrd_pool); // 完毕
return 0;
}
咱们再打印几行log,直接编译就能够跑起来:
简略程度堪比大一上学期C言语作业。
7 – 并发与结构之美
终究谈谈感受。
看完之后我有种很后悔为什么没有早点看的感觉,并且有一种,我必定还没有彻底了解到里面的精华,究竟我不能深刻地了解到规划者当时对并发的构思和模型上的选择。
我只能说,没有十多年尖端的体系调用和并发编程的功底写不出这样的代码,没有极致的审美与对品控的偏执也写不出这样的代码。
并发编程有许多说道,就正如退出这个这么简略的工作,想要做到退出时收回洁净却很难。假如说你写业务逻辑自己管线程,退出什么的sleep(1)都无所谓,但做框架的人假如不能把自己的框架做得完美无暇逻辑自洽,就难免让人感觉差点意思。
而这个thrdpool,它作为一个线程池,是如此地逻辑齐备。
再次让我深深地感到震撼:咱们身边那些原始的、底层的、根底的代码,还有许多新思路,还能够写得如此美。
Workflow项目源码地址:GitHub – sogou/workflow: C++ Parallel Computing and Asynchronous Networking Engine