网站首页 > 文章精选 正文
线程池解决了什么问题?
首先我们需要了解为什么要设计线程池,其到底解决了什么问题。
线程创建和销毁的开销是很大的,需要为其分配内存,将其加入调度队列由操作系统进行调度。而线程池的目的就是减少线程的频繁创建和销毁,维持一定合理数量的线程,“需要时取,用完时还”。(连接池的目的也类似,其维持一定数量连接的缓存池,尽量重用已有的连接,减少创建和关闭连接的频率;)线程池和连接池在一定程度上缓解了频繁调用IO接口带来的资源占用。
线程池设计的基本思想
清楚了线程池设计的目的,接下来思考如何设计一个线程池。
线程池的基本思想为:生产者-消费者模型。使用两个链表分别表示生产者(待处理的工作任务Jobs)和消费者(包括所有线程Threads),并通过一些同步原语来协调二者之间的工作。如下图:
线程池初始化时,会创建一定数量的线程并放入Threads链表中,每个线程处理函数开启一个死循环,通过条件变量等待信号的到来;当有新的任务到来时,会加入Jobs中,并同时通过信号唤醒线程处理相应任务。这就是一个简单的线程池设计思路。
线程池的实现
链表操作
因为涉及链表的使用,首先使用宏定义实现链表中节点的添加和删除,如下:
// 链表操作
#define LL_ADD(item, list) do { \
item->prev = NULL: \
item->next = list; \
list = item; \
} while(0)
#define LL_REMOVE(item, list) do { \
if(item->next) item->next->prev = item->prev; \
if(item->prev) item->prev->next = item->next; \
if(item == list) list = item->next; \
item->next = item->prev = NULL; \
}while(0)
数据结构
接下来需要实现链表中线程和工作任务的节点数据结构,以及管理所有线程、工作任务和同步原语的线程池数据结构,如下:
// 线程
struct thread_data {
pthread_t thread;
int terminate;
struct thread_pool *pool;
struct thread_data *next;
struct thread_data *prev;
};
// 工作任务
struct job_data {
void* user_data;
void (*func)(struct job_data*);
struct job_data *next;
struct job_data *prev;
};
// 线程池
struct thread_pool {
struct thread_data *threads;
struct job_data *jobs;
pthread_mutex_t mutex; /*互斥量*/
pthread_cond_t cond; /*信号量*/
};
线程的初始化
线程池初始化的流程,可以指定线程池中线程的数量,如下:
// 创建线程池
int create_thread_pool(struct thread_pool *pool, int num_threads) {
if (num_threads < 1) num_threads = 1;
memset(pool, 0, sizeof(struct thread_pool));
// 初始化锁+信号量
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&pool->mutex, &blank_mutex, sizeof(pthread_mutex_t));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));
// 创建工作线程
int i;
for(i = 0;i < num_threads;i ++){
struct thread_data *td = (struct thread_data*)malloc(sizeof(*td));
if(td == NULL) {
perror("thread_data");
exit(1);
}
td->pool = pool;
int ret = pthread_create(&pool->thread, NULL, thread_func, (void*)td);
if (ret) {
perror("pthread_create");
free(td);
exit(1);
}
LL_ADD(td, pool->threads);
}
return 0;
}
实现每个线程创建后的回调函数thread_func,通过互斥量和条件变量来等待和处理工作任务,注意,在拿到对应的Job后,需要将其从Jobs链表中REMOVE,避免多个线程处理同一个任务,如下:
// 线程创建后回调接口
static void * thread_func(void *ptr) {
struct thread_data *td = (struct thread_data*)ptr;
while (1) {
pthread_mutex_lock(&td->pool->mutex);
while(&td->pool->jobs == NULL) { // 没有任务
if(td->terminate) break; // 线程终止
pthread_cond_wait(&td->pool->cond, &td->pool->mutex);
}
if(td->terminate){
pthrad_mutex_unlock(&td->pool->mutex);
exit(1);
}
struct job_data* job = td->pool->jobs;
if(job != NULL) {
LL_REMOVE(job, td->pool->jobs);
}
pthread_mutex_unlock(&td->pool->mutex);
if(job == NULL) {
continue;
}
job->func(job);
}
free(td);
pthread_exit(NULL);
}
任务的产生
介绍完线程池消费模型后,来介绍下任务的生产方式:1)需要获得互斥锁,2)将任务添加至Jobs链表中,3)通过pthread_cond_signal唤醒线程,4)释放互斥锁,一气呵成。
// 生成任务
int push_job(struct thread_pool *pool, struct job_data *job) {
pthread_mutex_lock(&pool->mutex);
LL_ADD(job, pool->jobs);
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
return 0;
}
使用实例
void counter(job_data *job) {
int i = *(int*)job->user_data;
printf("counter: %d, threadid: %lu\n", i, pthread_self());
free(job->user_data);
free(job);
}
int main(int argc, char * argv[]) {
thread_pool pool;
create_thread_pool(&pool, 4);
int i;
for(i = 0;i < 30; i++) {
job_data * job = (job_data *)malloc(sizeof(job_data));
if(job == NULL){
perror("malloc");
exit(1);
}
job->func = counter;
job->user_data = malloc(sizeof(int));
*(int*)job->user_data = i;
push_job(&pool, job);
}
getchar();
printf("\n");
return 0;
}
Nginx线程池实现
介绍完一个简单的线程池实现,接下来学习学习Nginx中线程池是如何做的,其实大体思路跟上述实现差不多,只不过多了更多的细节考虑,这部分主要也是代码展示,主要代码在文件ngx_thread_pool.c中。
线程池结构体
// 线程池结构体
struct ngx_thread_pool_s {
// 互斥量 锁定操作waiting/queue/ngx_thread_poll_task_id
ngx_thread_mutex_t mtx;
// 待处理任务队列 ngx_thread_task_post(任务放入线程池);ngx_thread_pool_cycle(消费任务)
ngx_thread_pool_queue_t queue;
// 等待的任务数
ngx_int_t waiting;
// 条件变量,用于等待任务队列queue
ngx_thread_cond_t cond;
// 线程池名字
ngx_str_t name;
// 线程的数量,默认32个
ngx_uint_t threads;
// 任务等待队列,默认65535
ngx_int_t max_queue;
...
};
typedef struct ngx_thread_pool_s ngx_thread_pool_t;
线程池初始化
// 线程池初始化
static ngx_int_t
ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool){
pthread_t tid;
// 初始化线程池任务队列,first/last都空
ngx_thread_pool_queue_init(&tp->queue);
// 创建互斥量
ngx_thread_mutex_create(&tp->mtx, log);
// 创建条件变量
ngx_thread_cond_create(&tp->cond, log);
// 根据配置,创建线程
for (n = 0; n < tp->threas; n++){
pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
}
}
线程运行函数
线程执行函数同样为一个死循环。
// 线程运行函数,无限循环;从待处理任务队列里获取任务,然后执行task->handler(task->ctx)
// 处理完的任务加入完成队列
static void *
ngx_thread_pool_cycle(void *data) {
ngx_thread_pool_t *tp = data;
// 无限循环
// 从待处理任务队列里获取任务,然后执行task->handler(task->ctx)
for ( ;; ) {
// 锁定互斥量,防止多线程操作的竞态
ngx_thread_mutex_lock(&tp->ntx, tp->log);
// 即将处理一个任务,计数器-1
tp->waiting--;
// 如果任务队列是空,那么使用条件变量等待
while (tp->queue.first == NULL) {
ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log);
// (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
}
// 取任务
task = tp->queue.first;
tp->queue.first = task->next;
// 如果此时任务队列空,调整指针
if (tp->queue.first == NULL) {
tp->queue.last = &tp->queue.first;
}
// 操作完waiting,queue后解锁,其它线程可以获取task处理
ngx_thread_mutex_unlock(&tp->mtx, tp->log);
// 调用任务的handler,传递ctx,执行用户定义操作,同时阻塞的
task->handler(task->ctx, tp->log);
task->next = NULL;
// 自旋锁保护完成队列
ngx_spinlock(&ngx_thrad_pool_done_lock, 1, 2048);
// 处理完的任务加入队列
*ngx_thread_pool_done.last = task;
ngx_thread_pool_done.last = &task->next;
// 自旋锁解锁
ngx_unlock(&ngx_thread_pool_done_lock);
// 使用event模块的通知函数
// 让主线程nginx的epoll触发事件,调用ngx_thread_pool_handler,分发处理线程完成的任务
// 调用系统函数eventfd,创建一个可以用于通知的描述符,用于实现notify
(void) ngx_notify(ngx_thread_pool_handler);
}
}
// 分发处理线程完成的任务,在主线程里执行
// 调用event->handler,即异步事件完成后的回调函数
static void
ngx_thread_pool_handler(ngx_event_t *ev) {
ngx_event_t *event;
ngx_thread_task_t *task;
// 自旋锁保护完成队列
ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);
// 取出队列里的task,task->next有很多已经完成的任务
task = ngx_thread_pool_done.first;
// 队列置空
ngx_thread_pool_done.first = NULL;
ngx_thread_pool_done.last = &ngx_thread_pool_done.first;
//自旋锁解锁
ngx_unlock(&ngx_thread_pool_done_lock);
// 遍历完成已经完成的任务
while (task) {
// 取task里的事件对象
event = &task->event;
task = task->next;
event->complete = 1; // 线程异步事件已经完成
event->active = 0; // 事件处理完成
even->handler(event); // 调用异步事件完成后的回调函数
}
}
任务生产
// 任务推送
// 把任务放入线程池,由线程池分配线程执行
// 锁定互斥量,防止多线程的竞态
ngx_int_t
ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task) {
ngx_thread_mutex_lock(&tp->mtx, tp->log);
if (tp->waiting >= tp->max_queue){
// 等待处理任务大于设置的最大队列数
return;
}
// 条件变量,发送信号,在ngx_thread_pool_cycle里解除对队列的等待
ngx_thread_cond_signal(&tp->cond, tp->log);
// 任务加入待处理队列
*tp->queue.last = task;
tp->queue.last = &task->next;
// 等待任务增加
tp->waiting++;
// 解锁
(void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
}
关注公众号:程序员不是码农,发现更多精彩~
- 上一篇: Java Web应用调优线程池:没你想的那么复杂
- 下一篇: Qt多线程1:QThread
猜你喜欢
- 2025-05-25 Java线程池配置与调优:让程序跑得更快更稳
- 2025-05-25 Java线程:从青铜到王者的必修课(附实战代码)
- 2025-05-25 Linux系统编程—线程属性
- 2025-05-25 RT-Thread快速入门-线程管理(上)
- 2025-05-25 5分钟学会C/C++多线程编程进程和线程
- 2025-05-25 终于明白:有了线程,为什么还要有协程?
- 2025-05-25 多线程编程精要:从用户线程到线程池的效能进化论
- 2025-05-25 多线程——线程池的正确打开方式
- 2025-05-25 Spring Boot3 中多线程技术的使用指南
- 2025-05-25 线程的状态有哪些?它是如何工作的?
- 最近发表
-
- 面试中常被问到的Hash表,你了解吗
- JAVA面试考点:一文搞懂一致性Hash的原理和实现
- 一次性搞清楚equals和hashCode(hashcode() 与equals()区别,简单说明)
- HashMap.Key的故事:Key为什么出现Hash碰撞及冲突呢?
- hash冲突的几种解决方案对比(hash冲突的解决方式)
- 游戏王LN 无头骑士(无头骑士cv)
- Linux ln、unlink命令用法(linux link命令详解)
- n和l分不清矫正发音方法,这三步就够了
- golang引用私有gitlab项目代码(golang引入当前包下的文件)
- Instamic:录音领域中的 GoPro,让你想录就录,随心所欲
- 标签列表
-
- newcoder (56)
- 字符串的长度是指 (45)
- drawcontours()参数说明 (60)
- unsignedshortint (59)
- postman并发请求 (47)
- python列表删除 (50)
- 左程云什么水平 (56)
- 计算机网络的拓扑结构是指() (45)
- 编程题 (64)
- postgresql默认端口 (66)
- 数据库的概念模型独立于 (48)
- 产生系统死锁的原因可能是由于 (51)
- 数据库中只存放视图的 (62)
- 在vi中退出不保存的命令是 (53)
- 哪个命令可以将普通用户转换成超级用户 (49)
- noscript标签的作用 (48)
- 联合利华网申 (49)
- swagger和postman (46)
- 结构化程序设计主要强调 (53)
- 172.1 (57)
- apipostwebsocket (47)
- 唯品会后台 (61)
- 简历助手 (56)
- offshow (61)
- mysql数据库面试题 (57)