Linux 您所在的位置:网站首页 c线程池的工作原理 Linux

Linux

2023-04-30 05:34| 来源: 网络整理| 查看: 265

0 分享至

用微信扫码二维码

分享至好友和朋友圈

线程池是一种多线程处理形式,大多用于高并发服务器上,它能合理有效地利用高并发服务器上的线程资源。

在Unix网络编程中,线程与进程用于处理各项分支子功能,我们通常的操作是:

接收消息 ==> 消息分类 ==> 线程创建 ==> 传递消息到子线程 ==> 线程分离 ==> 在子线程中执行任务 ==> 任务结束退出。

对大多数小型局域网的通信来说,上述方法足够满足需求;但当我们的通信范围扩大到广域网或大型局域网通信中时,我们将面临大量消息频繁请求服务器。在这种情况下,创建与销毁线程都已经成为一种奢侈的开销,特别对于嵌入式服务器来说更应保证内存资源的合理利用。

因此,线程池技术应运而生;线程池允许一个线程可以多次复用,且每次复用的线程内部的消息处理可以不相同,将创建与销毁的开销省去,而不必来一个请求开一个线程;

结构讲解:

线程池是一个抽象的概念,其内部由任务队列,一堆线程,管理者线程组成;

我们将以上图为例,实现一个最基础的线程池,接下来将分部分依次讲解;讲解顺序为:

1.线程池总体结构

2.线程数组

3.任务队列

4.管理者线程

5.使用线程池接口的例子

一、线程池总体结构

这里讲解线程池在逻辑上的结构体。

看下方代码,该结构体threadpool_t中包含线程池状态信息,任务队列信息以及多线程操作中的互斥锁;在任务结构体中包含了一个可以放置多种不同任务函数的函数指针,一个传入该任务函数的void *类型的参数。

注意:在使用时需要将你的消息分类处理函数装入任务的(*function); 然后放置到任务队列并通知空闲线程。

线程池状态信息:描述当前线程池的基本信息,如是否开启、最小线程数、最大线程数、存活线程数、忙线程数、待销毁线程数等… …

任务队列信息:描述当前任务队列基本信息,如最大任务数、队列不为满条件变量、队列不为空条件变量等… …

多线程互斥锁:保证在同一时间点上只有一个线程在任务队列中取任务并修改任务队列信息、修改线程池信息;

函数指针:在打包消息阶段,将分类后的消息处理函数放在(*function);

void * 类型参数:用于传递消息处理函数需要的信息;

/*任务*/typedef structvoid *(*function)(void *);void *arg;} threadpool_task_t;

/*线程池管理*/struct threadpool_t{pthread_mutex_t lock; /* 锁住整个结构体 */pthread_mutex_t thread_counter; /* 用于使用忙线程数时的锁 */pthread_cond_t queue_not_full; /* 条件变量,任务队列不为满 */pthread_cond_t queue_not_empty; /* 任务队列不为空 */

pthread_t *threads; /* 存放线程的tid,实际上就是管理了线 数组 */pthread_t admin_tid; /* 管理者线程tid */threadpool_task_t *task_queue; /* 任务队列 */

/*线程池信息*/int min_thr_num; /* 线程池中最小线程数 */int max_thr_num; /* 线程池中最大线程数 */int live_thr_num; /* 线程池中存活的线程数 */int busy_thr_num; /* 忙线程,正在工作的线程 */int wait_exit_thr_num; /* 需要销毁的线程数 */

/*任务队列信息*/int queue_front; /* 队头 */int queue_rear; /* 队尾 */int queue_size;

/* 存在的任务数 */int queue_max_size; /* 队列能容纳的最大任务数 *//*线程池状态*/int shutdown; /* true为关闭 */};/*创建线程池*/threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size){ /* 最小线程数 最大线程数 最大任务数*/int i;threadpool_t *pool = NULL;do{/* 线程池空间开辟 */if ((pool=(threadpool_t *)malloc(sizeof(threadpool_t))) == NULL){printf("malloc threadpool false; \n");break;}/*信息初始化*/pool->min_thr_num = min_thr_num;pool->max_thr_num = max_thr_num;pool->busy_thr_num = 0;pool->live_thr_num = min_thr_num;pool->wait_exit_thr_num = 0;pool->queue_front = 0;pool->queue_rear = 0;pool->queue_size = 0;pool->queue_max_size = queue_max_size;pool->shutdown = false;

/* 根据最大线程数,给工作线程数组开空间,清0 */pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);if (pool->threads == NULL){printf("malloc threads false;\n");break;}memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);

/* 队列开空间 */pool->task_queue =(threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);

if (pool->task_queue == NULL){printf("malloc task queue false;\n");break;}

/* 初始化互斥锁和条件变量 */if ( pthread_mutex_init(&(pool->lock), NULL) != 0 ||pthread_mutex_init(&(pool->thread_counter), NULL) !=0 ||pthread_cond_init(&(pool->queue_not_empty), NULL) !=0 ||pthread_cond_init(&(pool->queue_not_full), NULL) !=0){printf("init lock or cond false;\n");break;}

/* 启动min_thr_num个工作线程 */for (i=0; i {/* pool指向当前线程池 threadpool_thread函数在后面讲解 */pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);printf("start thread 0x%x... \n", (unsigned int)pool->threads[i]);}/* 管理者线程 admin_thread函数在后面讲解 */pthread_create(&(pool->admin_tid), NULL, admin_thread, (void *)pool);

return pool;} while(0);

/* 释放pool的空间 */threadpool_free(pool);return NULL;}二、线程数组

线程数组实际上是在线程池初始化时开辟的一段存放一堆线程tid的空间,在逻辑上形成一个池,里面放置着提前创建的线程;这段空间中包含了正在工作的线程,等待工作的线程(空闲线程),等待被销毁的线程,声明但没有初始化的线程空间;

/*工作线程*/void *threadpool_thread(void *threadpool)threadpool_t *pool = (threadpool_t *)threadpool;threadpool_task_t task;

while (true){pthread_mutex_lock(&(pool->lock));

/* 无任务则阻塞在 “任务队列不为空” 上,有任务则跳出 */while ((pool->queue_size == 0) && (!pool->shutdown)){printf("thread 0x%x is waiting \n", (unsigned int)pthread_self());pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));

/* 判断是否需要清除线程,自杀功能 */if (pool->wait_exit_thr_num > 0){pool->wait_exit_thr_num--;/* 判断线程池中的线程数是否大于最小线程数,是则结束当前线程 */if (pool->live_thr_num > pool->min_thr_num){printf("thread 0x%x is exiting \n", (unsigned int)pthread_self());pool->live_thr_num--;pthread_mutex_unlock(&(pool->lock));pthread_exit(NULL);//结束线程}}}

/* 线程池开关状态 */if (pool->shutdown) //关闭线程池{pthread_mutex_unlock(&(pool->lock));printf("thread 0x%x is exiting \n", (unsigned int)pthread_self());pthread_exit(NULL); //线程自己结束自己}

//否则该线程可以拿出任务task.function = pool->task_queue[pool->queue_front].function; //出队操作task.arg = pool->task_queue[pool->queue_front].arg;

pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; //环型结构pool->queue_size--;

//通知可以添加新任务pthread_cond_broadcast(&(pool->queue_not_full));

//释放线程锁pthread_mutex_unlock(&(pool->lock));

//执行刚才取出的任务printf("thread 0x%x start working \n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter)); //锁住忙线程变量pool->busy_thr_num++;pthread_mutex_unlock(&(pool->thread_counter));

(*(task.function))(task.arg); //执行任务

//任务结束处理printf("thread 0x%x end working \n", (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));pool->busy_thr_num--;pthread_mutex_unlock(&(pool->thread_counter));}

pthread_exit(NULL);}三、任务队列

任务队列的存在形式与线程数组相似;在线程池初始化时根据传入的最大任务数开辟空间;当服务器前方后请求到来后,分类并打包消息成为任务,将任务放入任务队列并通知空闲线程来取;不同之处在于任务队列有明显的先后顺序,先进先出;而线程数组中的线程则是一个竞争关系去拿到互斥锁争取任务。

/*向线程池的任务队列中添加一个任务*/int threadpool_add_task(threadpool_t *pool, void *(*function)(void *arg), void *arg)pthread_mutex_lock(&(pool->lock));

/*如果队列满了,调用wait阻塞*/while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown))pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));

/*如果线程池处于关闭状态*/if (pool->shutdown){pthread_mutex_unlock(&(pool->lock));return -1;}

/*清空工作线程的回调函数的参数arg*/if (pool->task_queue[pool->queue_rear].arg != NULL){free(pool->task_queue[pool->queue_rear].arg);pool->task_queue[pool->queue_rear].arg = NULL;}

/*添加任务到任务队列*/pool->task_queue[pool->queue_rear].function = function;pool->task_queue[pool->queue_rear].arg = arg;pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* 逻辑环 */pool->queue_size++;

/*添加完任务后,队列就不为空了,唤醒线程池中的一个线程*/pthread_cond_signal(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));

return 0;}四、管理者线程

作为线程池的管理者,该线程的主要功能包括:

检查线程池内线程的存活状态,工作状态;

负责根据服务器当前的请求状态去动态的增加或删除线程,保证线程池中的线程数量维持在一个合理高效的平衡上;

说到底,它就是一个单独的线程,定时的去检查,根据我们的一个维持平衡算法去增删线程。

/*管理线程*/void *admin_thread(void *threadpool)int i;threadpool_t *pool = (threadpool_t *)threadpool;while (!pool->shutdown)printf("admin -----------------\n");sleep(DEFAULT_TIME); /*隔一段时间再管理*/pthread_mutex_lock(&(pool->lock)); /*加锁*/int queue_size = pool->queue_size; /*任务数*/int live_thr_num = pool->live_thr_num; /*存活的线程数*/pthread_mutex_unlock(&(pool->lock)); /*解锁*/

pthread_mutex_lock(&(pool->thread_counter));int busy_thr_num = pool->busy_thr_num; /*忙线程数*/pthread_mutex_unlock(&(pool->thread_counter));

printf("admin busy live -%d--%d-\n", busy_thr_num, live_thr_num);/*创建新线程 实际任务数量大于 最小正在等待的任务数量,存活线程数小于最大线程数*/if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num max_thr_num){printf("admin add-----------\n");pthread_mutex_lock(&(pool->lock));int add=0;

/*一次增加 DEFAULT_THREAD_NUM 个线程*/for (i=0; i max_thr_num && add && pool->live_thr_num < pool->max_thr_num; i++){if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])){pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);add++;pool->live_thr_num++;printf( "new thread -----------------------\n");}}

pthread_mutex_unlock(&(pool->lock));}

/*销毁多余的线程 忙线程x2 都小于 存活线程,并且存活的大于最小线程数*/if ((busy_thr_num*2) < live_thr_num && live_thr_num > pool->min_thr_num){// printf( "admin busy --%d--%d----\n", busy_thr_num, live_thr_num);/*一次销毁DEFAULT_THREAD_NUM个线程*/pthread_mutex_lock(&(pool->lock));pool->wait_exit_thr_num = DEFAULT_THREAD_NUM;pthread_mutex_unlock(&(pool->lock));

for (i=0; i {//通知正在处于空闲的线程,自杀pthread_cond_signal(&(pool->queue_not_empty));printf( "admin cler --\n");}}}return NULL;

/*线程是否存活*/int is_thread_alive(pthread_t tid){int kill_rc = pthread_kill(tid, 0); //发送0号信号,测试是否存活if (kill_rc == ESRCH) //线程不存在{return false;}return true;}五、释放/*释放线程池*/int threadpool_free(threadpool_t *pool){if (pool == NULL)return -1;if (pool->task_queue)free(pool->task_queue);if (pool->threads){free(pool->threads);pthread_mutex_lock(&(pool->lock)); /*先锁住再销毁*/pthread_mutex_destroy(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));pthread_mutex_destroy(&(pool->thread_counter));pthread_cond_destroy(&(pool->queue_not_empty));pthread_cond_destroy(&(pool->queue_not_full));}free(pool);pool = NULL;

return 0;}/*销毁线程池*/int threadpool_destroy(threadpool_t *pool){int i;if (pool == NULL){return -1;}pool->shutdown = true;

/*销毁管理者线程*/pthread_join(pool->admin_tid, NULL);

//通知所有线程去自杀(在自己领任务的过程中)for (i=0; i live_thr_num; i++){pthread_cond_broadcast(&(pool->queue_not_empty));}

/*等待线程结束 先是pthread_exit 然后等待其结束*/for (i=0; i live_thr_num; i++){pthread_join(pool->threads[i], NULL);}

threadpool_free(pool);return 0;}六、接口/* 线程池初始化,其管理者线程及工作线程都会启动 */threadpool_t *thp = threadpool_create(10, 100, 100);printf("threadpool init ... ... \n");

/* 接收到任务后添加 */threadpool_add_task(thp, do_work, (void *)p);

// ... ...

/* 销毁 */threadpool_destroy(thp);

原文:https://blog.csdn.net/qq_36359022/article/details/78796784

文章来源于网络,版权归原作者所有,如有侵权,请联系删除。

EOF

特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。

Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.

/阅读下一篇/ 返回网易首页 下载网易新闻客户端


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有