linux高效率编程:epoll和多线程 您所在的位置:网站首页 epoll和socket关系 linux高效率编程:epoll和多线程

linux高效率编程:epoll和多线程

2023-10-14 21:23| 来源: 网络整理| 查看: 265

一、epoll简介

epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。

相对于select方法,主要优点有2个:1. 支持一个进程打开大数目的socket描述符。2. IO效率不随FD数目增加而线性下降。

IO效率的提升的:select/poll会因为监听fd的数量而导致效率低下,因为它是轮询所有fd,有数据就处理,没数据就跳过,所以fd的数量增加会降低效率;而epoll只处理就绪的fd,它有一个就绪设备的队列,每次只轮询该队列的数据,然后进行处理(就绪队列的信息正是通过用户自定义的结构来告知的)。无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核与用户空间mmap同一块内存实现的。而poll与select的主要区别在于,select需要为读、写、异常事件分配创建一个描述符集合,最后轮询的时候,需要分别轮询这三个集合。而poll只需要一个集合,在每个描述符对应的结构上分别设置读、写、异常事件,最后轮询的时候,可以同时检查三种事件。poll与select在处理思想上是同一个层次,当然poll相对于select又优化,而epoll,则是完全不同的机制,有本质上的区别。

 

EPOLL事件有两种模型 Level Triggered (LT) 和 Edge Triggered (ET):

LT(level triggered,水平触发模式)是缺省的工作方式,并且同时支持 block 和 non-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。

ET(edge-triggered,边缘触发模式)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,等到下次有新的数据进来的时候才会再次出发就绪事件。如果一直不对这个fd进行I/O操作,导致fd变为未就绪时,内核同样不会发送更多的通知,因为only once。所以这种方式下,出错率比较高,需要增加一些检测程序。

 

二、epoll的函数

引用头文件:

#include

1. 创建epoll fd函数

int epoll_create(int size);

epoll_create()创建一个epoll的事例,通知内核需要监听size个fd。size指的并不是最大的后备存储设备,而是衡量内核内部结构大小的一个提示。当创建成功后,会占用一个fd,所以记得在使用完之后调用close(),否则fd可能会被耗尽。

 

自从Linux2.6.8版本以后,size值其实是没什么用的,不过要大于0,因为内核可以动态的分配大小,所以不需要size这个提示了。

另,int epoll_create1(int flag); 是在linux 2.6.27中加入的函数。当flag是0时,表示和epoll_create函数完全一样,不需要size的提示了。当flag = EPOLL_CLOEXEC,创建的epfd会设置FD_CLOEXEC当flag = EPOLL_NONBLOCK,创建的epfd会设置为非阻塞一般用法都是使用EPOLL_CLOEXEC.

 

2. epoll事件的注册函数

int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);

第一个参数epfd,为epoll_create返回的的epoll fd。

 

第二个参数op表示操作值。有三个操作类型,

EPOLL_CTL_ADD // 注册目标fd到epfd中,同时关联内部event到fd上 EPOLL_CTL_MOD // 修改已经注册到fd的监听事件 EPOLL_CTL_DEL // 从epfd中删除/移除已注册的fd,event可以被忽略,也可以为NULL

第三个参数fd表示需要监听的fd。

 

第四个参数event表示需要监听的事件,

typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */

events参数是一个枚举的集合,可以用” | “来增加事件类型,枚举如下:

EPOLLIN //表示对应的文件描述符可以读(包括对端SOCKET正常关闭); EPOLLOUT //表示对应的文件描述符可以写; EPOLLPRI //表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来); EPOLLERR //表示对应的文件描述符发生错误;epoll_wait会一直等待这个事件,所以一般没必要设置这个属性。 EPOLLHUP //表示对应的文件描述符被挂断;epoll_wait会一直等待这个事件,所以一般没必要设置这个属性。 EPOLLET //将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。epoll的默认工作方式是LT EPOLLRDHUP //(since Linux 2.6.17)表示套接字关闭了连接,或者关闭了正写一半的连接。 EPOLLONESHOT //(since Linux 2.6.2)只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。

用户可以用 epoll_data 这个 union 在 epoll_event 里面附带一些自定义的信息,这个 epoll_data 会随着 epoll_wait 返回的 epoll_event 一并返回。

 

epoll_data是给用户自由使用的,epoll 不关心里面的内容。一般真正使用起来,事实上第一个就足够了,也就是void *,用来包装任何自定义的结构体(epoll_data是一个联合体,只能使用其中一个,fd等是提供给较简单的应用场景来方便使用的)。

3. epoll等待事件函数

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); int epoll_pwait(int epfd, struct epoll_event *events, int maxevents, int timeout, const sigset_t *sigmask);

第一个参数:表示epoll_wait等待epfd上的事件,由epoll_create 生成的epoll专用的文件描述符。

 

第二个参数:events指针携带有epoll_data_t数据,用于回传代处理事件的数组。

第三个参数:maxevents当前需要监听的所有socket句柄数。

第四个参数:timeout表示超时时间(单位:毫秒)。

epoll_pwait(since linux 2.6.19)允许一个应用程序安全的等待,直到fd设备准备就绪,或者捕获到一个信号量。其中sigmask表示要捕获的信号量。

返回值:函数如果等待成功,则返回fd的数字;0表示等待fd超时,其他错误号请查看errno

 

三、实例

一个发送压力工具的Demo,主要是演示相关技术的使用。

在linux下使用多线程、epoll编程、socket编程技术实现,系统设置为'ulimit -n' > 10240,可以稳定达到短连接上万的连接,长连接5000个。

设计要点如下:

1. 主函数中循环读取请求数据,新建的3个子线程分别去建立连接,发送请求,接收请求。会初始化2个epoll来分别监听可以写数据(发送请求)的事件和可以读数据(接收请求)的事件。

2. 使用的数据结构为,将socket句柄和相关操作封装在query类中,该query类带有前向和后向指针,该query类会在注册epoll监听队列时通过用户结构体按个指针参数传进去。有3个query类的双链表分别表示空闲队列freeList,准备好发送请求队列readyList,和工作队列workList,这3个队列会被多线程操作,故使用共享锁来排斥性访问。之所以使用双链表结构是便于处理epoll事件时,直接将query切换队列,而无需去遍历。

3. 主线程等待freeList不为空,则取下其中一个结点,读取请求数据,填充到该结点中。并将结点从freeList队列转移到readyList队列。

4. 建立连接子线程会循环等待发送时间到来,以20ms为间隔,每次都新建‘压力值*20/1000’个socket连接,建立成功后则将这些结点加入到send_epoll的队列上。并将结点从readyList队列转移到workList队列。

5. 发送请求子线程会循环等待send_epoll上发生的写事件,并处理就绪的query类,发送请求。请求发送成功后,将该结点从send_epoll的队列中删除,注册到recv_epoll的队列上。

6. 接收请求子线程会循环等待recv_epoll上发生的读事件,并处理就绪的query类,接收请求。接收请求成功后,将结点从recv_epoll的队列上删除,并将结点放回到freeList或readyList。

/*FileName: myPressTool.cpp*/ #include #include #include #include #include #include "conf.h" #include "query.h" #include "query_list.h" #include "control_send.h" #include "send_req.h" #include "recv_res.h" #include "data.h" //全局变量定义 conf_t g_conf; conf_t* g_pconf = NULL; /*配置*/ CQuery* g_pfree_list = NULL; /*无数据的CQuery队列*/ CQuery* g_pready_list = NULL; /*有数据准备建立连接的CQuery队列*/ CQuery* g_pwork_list = NULL; /*发送或接收结果状态的CQuery队列*/ int g_send_epoll_fd; int g_recv_epoll_fd; bool g_over = false; size_t g_query_num = 0; int check_res_callback(char* buffer,int len){ // //printf("call call_back funtion %s %d\n",buffer,len); return 0; } int init_main() { g_pconf = &g_conf; if(0 != load_config()){return -1;} /*初始化g_pconf变量*/ /*初始化CQuery队列*/ g_query_num = g_pconf->velocity * g_pconf->BUFFER_NUM; g_pfree_list = new CQuery[g_query_num]; if( NULL == g_pfree_list ){ return -1; } g_pfree_list[0].set_pre_query(NULL); g_pfree_list[0].set_next_query(&g_pfree_list[1]); for(size_t i=1;i (g_send_epoll_fd = epoll_create(g_query_num))){ /*创建发送epoll,其参数已无意义,只需要一个非负数即可*/ return -2; } if(0 > (g_recv_epoll_fd = epoll_create(g_query_num))){ /*创建接收epoll,其参数已无意义,只需要一个非负数即可*/ return -2; } /*初始化共享锁*/ init_query_list_lock(); return 0; } int clean_main() { CQuery* tp = g_pwork_list; CQuery* tdel = NULL; while(NULL != tp){ tp->close_socket(); tdel=tp; tp=tp->get_next_query(); delete tdel; } tp = g_pready_list; while(NULL != tp){ tp->close_socket(); tdel=tp; tp=tp->get_next_query(); delete tdel; } tp = g_pfree_list; while(NULL != tp){ tp->close_socket(); tdel=tp; tp=tp->get_next_query(); delete tdel; } /*关闭epoll socket*/ close(g_send_epoll_fd); close(g_recv_epoll_fd); /*销毁共享锁*/ destroy_query_list_lock(); return 0; } int main() { pthread_t control_pid; pthread_t send_pid; pthread_t recv_pid; if(0 != init_main()){return -1;} /*初始化全局变量*/ //忽略SIGPIPE信号,该信号默认是使当前程序退出。当目标机器的socket已经关闭连接时,再调用write()发送数据会收到一个RST响应,第二次调用write()发送数据时会先调用SIGPIPE响应函数,然后write返回-1,errno号为EPIPE(32) signal(SIGPIPE, SIG_IGN); //新建3个子线程 if(0 != pthread_create(&control_pid, NULL, control_send_main, NULL)){clean_main();return -2;} if(0 != pthread_create(&send_pid, NULL, send_req_main, NULL)){clean_main();return -2;} if(0 != pthread_create(&recv_pid, NULL, recv_res_main, (void*)check_res_callback)){clean_main();return -2;} //循环读取数据,数据全部读完返回1,free队列用完返回0 while(0 == read_data()){ usleep(DEFAULT_SEND_EACH_TIME); /*等待一个发送间隔后再读取数据*/ //printf("======WARNNING: free list is NULL!\n"); } //等待使用中的队列为空再结束 while( NULL != g_pwork_list ){ usleep(100000); } g_over = true; /*通知子线程结束*/ pthread_join(send_pid,NULL); pthread_join(recv_pid,NULL); pthread_join(control_pid,NULL); if(0 != clean_main()){return -3;} /*清理全局变量*/ return 0; } /*FileName: control_send.h*/ #ifndef __CONTROL_SEND_H__ #define __CONTROL_SEND_H__ #include #include #include #include "query_list.h" #include "conf.h" //定义每次发送时间,默认为10毫秒 #define DEFAULT_SEND_EACH_TIME 10000 void* control_send_main(void *); void prepare_to_send(); bool wait_send_time(); void init_c_data(); extern int g_send_epoll_fd; extern conf_t* g_pconf; extern bool g_over; typedef struct _control_send_thread_data{ struct timeval send_time; struct timeval current_time; int start_time; int compute_time; size_t current_velocity; //修正压力, float fix_velocity; }control_send_thread_data; #endif /*FileName: control_send.cpp*/ #include "control_send.h" control_send_thread_data c_data; void init_c_data(){ gettimeofday(&c_data.send_time,NULL); c_data.start_time = c_data.send_time.tv_sec; c_data.compute_time = c_data.start_time; c_data.current_velocity = g_pconf->velocity; c_data.fix_velocity =0.0; //计算进位 if(c_data.send_time.tv_usec + DEFAULT_SEND_EACH_TIME > 999999){ c_data.send_time.tv_usec = c_data.send_time.tv_usec + DEFAULT_SEND_EACH_TIME - 1000000; c_data.send_time.tv_sec++; } else { c_data.send_time.tv_sec = c_data.send_time.tv_sec; c_data.send_time.tv_usec = c_data.send_time.tv_usec + DEFAULT_SEND_EACH_TIME; } } //循环等待到达发送时间,本函数保证每隔DEFAULT_SEND_EACH_TIME=10ms可以准许发送一次 bool wait_send_time(){ struct timeval tv; while(true){ /*等待当前时间到达发送时间,在prepare_to_send()函数中计算下次的发送时间*/ gettimeofday(&tv,NULL); if((tv.tv_usec > c_data.send_time.tv_usec && tv.tv_sec == c_data.send_time.tv_sec) || (tv.tv_sec - c_data.send_time.tv_sec)*1000000 + tv.tv_usec - c_data.send_time.tv_usec > 0 ){ c_data.current_time.tv_sec =tv.tv_sec; /*准许发送,记录当前时间*/ c_data.current_time.tv_usec = tv.tv_usec; return true; } usleep(DEFAULT_SEND_EACH_TIME/100); /*等待发送间隔的100分之一*/ } return false; } void* control_send_main(void *){ init_c_data(); while(!g_over){ if(wait_send_time()){ prepare_to_send(); } } return NULL; } void prepare_to_send(){ /*计算下次发送的时间*/ int time_pass = (c_data.current_time.tv_sec - c_data.send_time.tv_sec)*1000000 + (c_data.current_time.tv_usec-c_data.send_time.tv_usec); if(0 > time_pass){ time_pass =0; /*出错情况,修正后继续发送*/ } if(c_data.current_time.tv_usec + DEFAULT_SEND_EACH_TIME - time_pass > 999999){ /*跨秒情况*/ c_data.send_time.tv_usec = c_data.current_time.tv_usec + DEFAULT_SEND_EACH_TIME - time_pass - 1000000; c_data.send_time.tv_sec = c_data.current_time.tv_sec+1; } else { /*未跨秒情况*/ c_data.send_time.tv_sec = c_data.current_time.tv_sec; c_data.send_time.tv_usec = c_data.current_time.tv_usec + DEFAULT_SEND_EACH_TIME - time_pass; } /*计算本次需要发送的请求数*/ int velocity = c_data.current_velocity * ( DEFAULT_SEND_EACH_TIME)/1000000 ; c_data.fix_velocity += float(c_data.current_velocity) * DEFAULT_SEND_EACH_TIME /1000000 - velocity; /*使用浮点数计算对压力值的误差进行积累,并在超过1后进行修正*/ if(c_data.fix_velocity > 1.0f){ velocity += 1; c_data.fix_velocity -= 1.0f; } size_t try_num=0; size_t keep_alive_num=0; CQuery* work_query; for(int i=0;ikeepAlive && work_query->is_sock_ok()){ /*长连接,优先使用已有连接*/ //printf("keepAlive use old connection.\n"); struct epoll_event evt; evt.events = EPOLLERR | EPOLLET | EPOLLHUP | EPOLLOUT; evt.data.ptr = work_query; if(0 > epoll_ctl(g_send_epoll_fd,EPOLL_CTL_ADD,work_query->get_socket(),&evt)){ add_free_list(work_query); continue; } keep_alive_num++; } else { /*短连接*/ if( 0 > work_query->make_tcp_connect(g_pconf->ip, g_pconf->port) ){ //printf("make_tcp_connect failed: %d.\n", work_query->check_socket_err()); work_query->close_socket(); add_free_list(work_query); continue; } /*把建立的socket连接添加到send_epoll的监听队列上*/ struct epoll_event evt; evt.events = EPOLLERR | EPOLLHUP | EPOLLOUT; evt.data.ptr = work_query; if(0 > epoll_ctl(g_send_epoll_fd,EPOLL_CTL_ADD,work_query->get_socket(),&evt)){ //printf("epoll_ctl failed.\n"); work_query->close_socket(); add_free_list(work_query); continue; } } /*建立连接成功,放入work队列*/ add_work_list(work_query); i++; } printf("Add to work list, send: %d, keep_alive: %d, try-failed: %d.\n", velocity, keep_alive_num, try_num-velocity); } /*FileName: send_req.h*/ #ifndef __SEND_REQ_H__ #define __SEND_REQ_H__ #include #include #include #include "query.h" #include "query_list.h" void* send_req_main(void*); extern int g_send_epoll_fd; extern int g_recv_epoll_fd; extern bool g_over; extern size_t g_query_num; #endif /*FileName: send_req.cpp*/ #include "send_req.h" void* send_req_main(void*){ CQuery* pQuery = NULL; struct epoll_event ep_evt[g_query_num]; while(!g_over){ int ready_num = epoll_wait(g_send_epoll_fd,ep_evt,g_query_num,TIME_OUT); /*等待事件*/ //printf("send event num: %d.\n", ready_num); size_t fin_num=0, err_num=0; for(int i=0;iget_socket(),NULL); /*不再在send_epoll上监听该socket*/ if(ep_evt[i].events & EPOLLOUT){ /*为写事件*/ if(!pQuery->is_sock_ok() || 0 > pQuery->send_query()){ /*发送请求数据*/ pQuery->close_socket(); del_work_list(pQuery); add_free_list(pQuery); err_num++; continue; } struct epoll_event evt; evt.events = EPOLLIN | EPOLLET | EPOLLERR | EPOLLHUP | EPOLLPRI; evt.data.ptr = pQuery; if(0 > epoll_ctl(g_recv_epoll_fd,EPOLL_CTL_ADD,pQuery->get_socket(),&evt)){ /*注册在recv_epoll的监听队列上*/ pQuery->close_socket(); del_work_list(pQuery); add_free_list(pQuery); err_num++; continue; } fin_num++; } else { /*不是写事件*/ pQuery->close_socket(); del_work_list(pQuery); add_free_list(pQuery); err_num++; } } //printf("send done. event: %d, finish: %d, error: %d.\n", ready_num, fin_num, err_num); } return NULL; } /*FileName: recv_res.h*/ #ifndef __RECV_RES_h__ #define __RECV_RES_h__ #include #include #include #include "conf.h" #include "query.h" #include "query_list.h" #include "data.h" typedef int (*CALL_BACK)(char*, int); void* recv_res_main(void*); extern size_t g_query_num; extern int g_recv_epoll_fd; extern conf_t* g_pconf; extern bool g_over; #endif /*FileName: recv_res.cpp*/ #include "recv_res.h" void* recv_res_main(void* call_back_funtion){ CQuery* pQuery = NULL; struct epoll_event ep_evt[g_query_num]; while(!g_over){ int ready_num = epoll_wait(g_recv_epoll_fd,ep_evt,g_query_num,TIME_OUT); //printf("recv event num: %d.\n", ready_num); size_t fin_num=0, alive_num=0, err_num=0; for(int i=0;iget_socket(),NULL); if(ep_evt[i].events & EPOLLIN){ /*读事件*/ if( !pQuery->is_sock_ok() || 0 > pQuery->recv_reply()){ /*接收请求数据*/ pQuery->close_socket(); del_work_list(pQuery); add_free_list(pQuery); err_num++; continue; } //调用callBack函数校验数据 if(NULL != call_back_funtion){ (*((CALL_BACK)call_back_funtion))(pQuery->get_query_buffer(), pQuery->get_query_len()); } /*接收数据完毕*/ if( g_pconf->keepAlive){ /*长连接模式,重用socket连接,重新装填数据后放入ready队列*/ if( 0 != _read_data(pQuery) ){ pQuery->close_socket(); del_work_list(pQuery); add_free_list(pQuery); err_num++; } del_work_list(pQuery); add_ready_list(pQuery); alive_num++; } else { /*非长连接模式,直接关闭socket后放入free队列*/ pQuery->close_socket(); del_work_list(pQuery); add_free_list(pQuery); fin_num++; } } else { /*非读事件*/ pQuery->close_socket(); del_work_list(pQuery); add_free_list(pQuery); err_num++; } } //printf("recv done. event: %d, finish: %d, alive: %d, error: %d.\n", ready_num, fin_num+alive_num, alive_num, err_num); } return NULL; } /*FileName: conf.h*/ #ifndef __CONF_H__ #define __CONF_H__ #include typedef struct _conf_t{ int velocity; /*目标压力*/ char ip[16]; /*目标主机ip*/ int port; /*目标主机端口*/ bool keepAlive; /*是否保存长连接*/ int BUFFER_NUM; /*申请buffer的基数,buffer数=(BUFFER_NUM*velocity) */ }conf_t; int load_config(); extern conf_t* g_pconf; #endif /*FileName: conf.cpp*/ #include "conf.h" int load_config() { g_pconf->velocity = 5000; strncpy(g_pconf->ip, "10.26.97.46", sizeof(g_pconf->ip)); g_pconf->port = 8183; g_pconf->keepAlive = true; if(g_pconf->keepAlive)g_pconf->BUFFER_NUM = 2; else g_pconf->BUFFER_NUM = 10; return 0; } /*FileName: data.h*/ #ifndef __DATA_H__ #define __DATA_H__ #include #include "query_list.h" int _read_data( CQuery* pQuery ); int read_data(); #endif /*FileName: data.cpp*/ #include "data.h" int read_data(){ CQuery* pQuery=NULL; size_t send_num=0; int read_ret=0; while( NULL != (pQuery = get_free_query()) ){ if( 0 != (read_ret=_read_data(pQuery)) ){ add_free_list(pQuery); if( 1 == read_ret ){ /*文件读完了*/ return 1; } else { /*读取遇到错误,继续尝试*/ continue; } } add_ready_list(pQuery); /*添加到ready队列*/ send_num++; } /* int flen=0,rlen=0,wlen=0; CQuery* tp = g_pfree_list; while(NULL != tp){ tp = tp->get_next_query(); flen++; } tp = g_pready_list; while(NULL != tp){ tp = tp->get_next_query(); rlen++; } tp = g_pwork_list; while(NULL != tp){ tp = tp->get_next_query(); wlen++; } */ //printf("Add to ready list: %d. free: %d, ready: %d, work: %d.\n", send_num, flen, rlen, wlen); return 0; } int _read_data( CQuery* pQuery ){ char* request = new char[MAX_QUERY_LEN+1]; char* fun_type = "GET"; char* url = "/index.html"; char* accept_type = "html/text"; char* ip = "127.0.0.1"; int port = 80; char* connection_type = NULL; if(g_pconf->keepAlive){ connection_type = "Keep-Alive"; }else{ connection_type = "Close"; } snprintf(request, MAX_QUERY_LEN+1, "%s %s HTTP/1.1\r\nAccept: %s\r\nHost: %s:%d\r\nConnection: %s\r\n\r\n", fun_type, url, accept_type, ip, port, connection_type); pQuery->set_query(request, strnlen(request, MAX_QUERY_LEN)); /*读入数据到pQuery*/ delete []request; return 0; } /*FileName: query.h*/ #ifndef __QUERY_H__ #define __QUERY_H__ #include #include #include #include #include #include #include #include #include #include //#include #include "conf.h" /*接收数据的缓冲大小*/ #define MAX_QUERY_LEN 4096 #define TIME_OUT 1000 typedef enum _QUERY_STATE_e{ QUERY_STATE_SEND=1, QUERY_STATE_RECV, QUERY_STATE_IDLE }QUERY_STATE; class CQuery{ public: CQuery() :m_socket_fd(-1) ,m_query_len(-1) { pPre_query=NULL; pNext_query=NULL; m_state = QUERY_STATE_IDLE; m_str_Query[0]='\0'; } ~CQuery(); /*操作socket*/ int make_tcp_connect(const char* pIP,const int port); int send_query(); int recv_reply(); int close_socket(); bool is_sock_ok(); int check_socket_err(); /*get & set操作*/ int set_query_sock(int sock); int get_socket(); int set_query(const char * pBuf,const int buf_len); char* get_query_buffer(); int get_query_len(); int set_pre_query(CQuery* pQuery); CQuery* get_pre_query(); int set_next_query(CQuery* pQuery); CQuery* get_next_query(); private: int m_socket_fd; //socket fd char m_str_Query[MAX_QUERY_LEN+1]; //接收的数据 int m_query_len; // query长度 CQuery* pPre_query; //上一个req CQuery* pNext_query; //下一个req QUERY_STATE m_state; }; extern conf_t* g_pconf; #endif /*FileName: query.cpp*/ #include "query.h" inline int SetSockNonblock(int sockfd) { int flag = 0; if(0 > (flag = fcntl(sockfd, F_GETFL, 0))){ /*fcntl()针对(文件)描述符提供控制;F_GETFL 取得文件描述词状态旗标,此旗标为open()的参数flags。*/ return -1; } if(0 > fcntl(sockfd, F_SETFL, flag | O_NONBLOCK)){ /*F_SETFL 设置文件描述词状态旗标,参数arg为新旗标,但只允许O_APPEND、O_NONBLOCK和O_ASYNC位的改变,其他位的改变将不受影响。此处设置为非阻塞方式*/ return -1; } return 0; } CQuery::~CQuery(){ close_socket(); }; //建立socket并对实参给出的pIP:port建立连接,就绪后保存到m_socket_fd待用 int CQuery::make_tcp_connect(const char* pIP,const int port){ if((NULL == pIP) || (0 == port)){ return -99; } if( QUERY_STATE_IDLE != m_state ){ /*如果已经建立了socket,就先关闭它再继续;socket都是在调用该函数时才关闭,故必须有这个判断和关闭*/ close_socket(); } if(0 > (m_socket_fd = socket(AF_INET, SOCK_STREAM, 0))){ /*建立socket,参数表示面向网络的连接,并且是面向连接的基于TCP的应用*/ m_socket_fd = -1; return -10; } int one = 1; if(setsockopt(m_socket_fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(int)) < 0) /*设置socket参数:允许套接口和一个已在使用中的地址捆绑*/ { close_socket(); return -2; } one = 1; if(setsockopt(m_socket_fd, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof(int)) < 0) /*设置socket参数:禁止Nagle算法。Nagle算法通过将未确认的数据存入缓冲区直到蓄足一个包一起发送的方法,来减少主机发送的零碎小数据包的数目。*/ { close_socket(); return -3; } struct linger m_linger; m_linger.l_onoff=1; m_linger.l_linger=0; if(setsockopt(m_socket_fd,SOL_SOCKET,SO_LINGER,(const char*)&m_linger,sizeof(m_linger)) SetSockNonblock(m_socket_fd)){ /*设置fd参数:设置为非阻塞模式*/ close_socket(); return -5; } struct sockaddr_in servaddr; /*保存服务器地址,用于connect()函数*/ memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port); /*将一个无符号短整型的主机数值转换为网络字节顺序,即大尾顺序(big-endian)。网络字节顺序是TCP/IP中规定好的一种数据表示格式,它与具体的CPU类型、操作系统等无关,从而可以保证数据在不同主机之间传输时能够被正确解释,网络字节顺序采用big-endian排序方式。*/ if(0 >= inet_pton(AF_INET, pIP, &(servaddr.sin_addr))){ /*Linux下IP地址转换函数,可以在将IP地址在“点分十进制”和“整数”之间转换*/ close_socket(); return -6; } int ret_code = connect(m_socket_fd, (struct sockaddr *)&servaddr, sizeof(servaddr)); /*在客户端的套接字上发送连接请求*/ if((0 > ret_code) && (EINPROGRESS != errno)){ close_socket(); return -20; } m_state=QUERY_STATE_SEND; return 0; } int CQuery::send_query(){ if( QUERY_STATE_SEND != m_state ){ close_socket(); return -1; } if ( !is_sock_ok() )return -2; int send_byte=0,have_send=0; while( have_send < m_query_len ){ /*用一个while循环不断的写入数据,但是循环过程中的buf参数和nbytes参数是我们自己来更新的。返回值大于0,表示写了部分数据或者是全部的数据。返回值小于0,此时出错了,需要根据错误类型进行相应的处理*/ send_byte = write(m_socket_fd, m_str_Query+have_send, m_query_len-have_send); /*将socket当普通文件进行读写就可以*/ if(send_byte = MAX_QUERY_LEN ) { break; } } if( have_read >= MAX_QUERY_LEN ) /*数据超过最大Query长度,则舍弃后面的数据*/ { char bad_buff[MAX_QUERY_LEN]; while( (read_byte = read(m_socket_fd, bad_buff, MAX_QUERY_LEN)) != 0 ){ if(read_byte get_next_query(); if( NULL != g_pfree_list )g_pfree_list->set_pre_query(NULL); pthread_mutex_unlock(&free_list_mutex); pQuery->set_pre_query(NULL); pQuery->set_next_query(NULL); return pQuery; } int add_free_list(CQuery* pQuery){ pthread_mutex_lock(&free_list_mutex); if(NULL == g_pfree_list){ pQuery->set_pre_query(NULL); pQuery->set_next_query(NULL); g_pfree_list = pQuery; } else { pQuery->set_pre_query(NULL); pQuery->set_next_query(g_pfree_list); pQuery->get_next_query()->set_pre_query(pQuery); g_pfree_list = pQuery; } pthread_mutex_unlock(&free_list_mutex); return 0; } CQuery* get_ready_query(){ CQuery* pQuery = NULL; pthread_mutex_lock(&ready_list_mutex); if(NULL == g_pready_list){ pthread_mutex_unlock(&ready_list_mutex); return NULL; } pQuery = g_pready_list; g_pready_list = g_pready_list->get_next_query(); if( NULL != g_pready_list )g_pready_list->set_pre_query(NULL); pthread_mutex_unlock(&ready_list_mutex); pQuery->set_pre_query(NULL); pQuery->set_next_query(NULL); return pQuery; } int add_ready_list(CQuery* pQuery){ pthread_mutex_lock(&ready_list_mutex); if(NULL == g_pready_list){ pQuery->set_pre_query(NULL); pQuery->set_next_query(NULL); g_pready_list = pQuery; } else { pQuery->set_pre_query(NULL); pQuery->set_next_query(g_pready_list); pQuery->get_next_query()->set_pre_query(pQuery); g_pready_list = pQuery; } pthread_mutex_unlock(&ready_list_mutex); return 0; } int add_work_list(CQuery* pQuery){ pthread_mutex_lock(&work_list_mutex); if(NULL == g_pwork_list){ pQuery->set_pre_query(NULL); pQuery->set_next_query(NULL); g_pwork_list = pQuery; } else { pQuery->set_pre_query(NULL); pQuery->set_next_query(g_pwork_list); pQuery->get_next_query()->set_pre_query(pQuery); g_pwork_list = pQuery; } pthread_mutex_unlock(&work_list_mutex); return 0; } int del_work_list(CQuery* pQuery){ pthread_mutex_lock(&work_list_mutex); if( NULL != pQuery->get_pre_query() ){ pQuery->get_pre_query()->set_next_query(pQuery->get_next_query()); } else { g_pwork_list = pQuery->get_next_query(); } if( NULL != pQuery->get_next_query() ){ pQuery->get_next_query()->set_pre_query(pQuery->get_pre_query()); } pthread_mutex_unlock(&work_list_mutex); pQuery->set_pre_query(NULL); pQuery->set_next_query(NULL); return 0; } #FileName: Makefile #Compile cmd: make CC = gcc CPP = g++ CXX = gcc EXECUTABLE = myPressTool all : $(EXECUTABLE) rm -f *.o clean : rm -f *.o $(EXECUTABLE) $(EXECUTABLE) : myPressTool.o conf.o control_send.o data.o query.o query_list.o recv_res.o send_req.o $(CPP) -o $@ $^ -lm -lpthread %.o : %.cpp $(CPP) -c $< -o $@ 附录A. 文件描述符fd

内核(kernel)利用文件描述符(file descriptor)来访问文件。文件描述符是非负整数。打开现存文件或新建文件时,内核会返回一个文件描述符。读写文件也需要使用文件描述符来指定待读写的文件。

文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。

文件描述符的范围是0 ~ OPEN_MAX ,可以在如下位置查看:/usr/include/linux/limits.h:#define OPEN_MAX 256

有三个特殊的文件描述符,每个进程在创建时,都默认打开三个文件描述符:标准输入(standard input)的文件描述符是 0,标准输出(standard output)是 1,标准错误(standard error)是 2。POSIX 定义了 STDIN_FILENO、STDOUT_FILENO 和 STDERR_FILENO 来代替 0、1、2。这三个符号常量的定义位于头文件 unistd.h。

//FileName: file_descriptor.cpp //Compile: g++ -o file_descriptor.out file_descriptor.cpp //Run: ./file_descriptor.out #include #include int main(void) { int fd; fd = open("/tmp/tmp.txt", O_RDONLY | O_CREAT, S_IRWXU | S_IRGRP | S_IROTH); /*只读打开,若不存在则创建,创建的文件属性为拥有者读写执行,组只读,其他只读*/ //打开或创建的文件会获取一个fd,且排在STDIN_FILENO、STDOUT_FILENO 和 STDERR_FILENO之后,应该为3 printf("fd:%d\n", fd); return 0; }

内核使用三种数据结构表示打开的文件:(1)文件描述符表:用户区的一部分,除非通过使用文件描述符的函数,否则程序无法对其进行访问。对进程中每个打开的文件,该表都包含一个文件描述符条目,fd值即该文件在该表中的下标索引。与每个文件描述符相关联的是:a)文件描述符标志(close_on_exec).b))指向系统文件表的某个条目的指针。(2)系统文件表:为系统中所有的进程共享。对每个活动的open, 它都包含一个条目。每个系统文件表的条目都包含文件偏移量、以及。每个文件表项包含:a)访问模式(读、写、or 读-写)b)(在内存索引节点表中)当前文件的偏移量c)指向它的文件描述符表的条目计数(3)内存索引节点表: 对系统中的每个活动的文件(被某个进程打开了),内存中索引节点表都包含一个条目。几个系统文件表条目可能对应于同一个内存索引节点表的条目(不同进程打开同一个文件)。

若2个进程同时打开一个文件做读操作,每个进程都有自己相对于文件的偏移量,而且读入整个文件是独立于另一个进程的;如果2个进程打开同一个文件做写操作,写操作是相互独立的,每个进程都可以重写另一个进程写入的内容。如果一个进程在open()以后又执行了close()函数,操作系统会删除文件描述符表的对应条目(回收fd),和系统文件表的对应条目(若指向它的描述符表唯一,若不唯一则对该条目的计数减一,不删除该条目,也不对内存索引节点表条目中的计数进行更改),并对内存索引节点表条目中的计数减1,如果自减以后变为0,说明没有其他进程链接此文件,将索引节点表条目也删除。

 

通过fork()创建子进程时,子进程继承父进程的文件描述符表,即子进程完全复制父进程的文件描述符表。对于父进程在fork()之前打开的文件来说,子进程都会继承,与父进程共享相同的文件偏移量并相互影响(例如STDIN_FILENO、STDOUT_FILENO 和 STDERR_FILENO ),即对应文件描述符表的条目均指向相同的系统文件表条目。在fork()之后父进程(或子进程)打开的文件,不与子进程(或父进程)共享文件偏移量并相互独立无影响。即对应文件描述符表的条目指向新增的系统文件表条目,并不相同。

 

附录B. 多进程

多进程的实例程序如下:

//FileName: multi_pid_example.cpp //Compile: g++ -o multi_pid_example.out multi_pid_example.cpp //Run: ./multi_pid_example.out #include #include #include #include #include //linux进程使用的内存空间分为代码段、堆栈段和数据段: //代码段用来存放程序执行代码,也有可能包含一些只读的常数变量,例如字符串常量等;只读可执行。 //堆栈段:Stack,存放局部变量和临时变量如复杂表达式的中间变量;向下增长。 //数据段:其空间自下而上又分为:初始化数据区域:已赋值的全局变量和静态变量;BSS:未初始化全局变量和静态变量并在运行前由系统进行清零操作;堆;Heap,例如malloc动态申请的内存;堆向上增长,栈向下增长,两者相对,之间有空洞区域。 //可执行代码(linux下为ELF格式)中存储了.text,.data,.bss三个段,其中.bss段只在Section header table中描述端的起始偏移位置和长度,并不分配实际的段数据,也不占用空间。 void print_exit() { printf("the exit pid:%d\n",getpid() ); /*打印进程pid*/ } int main () { pid_t pid; atexit( print_exit ); /*注册该进程退出时的回调函数*/ //fork后,子进程会复制父进程的task_struct结构,并为子进程的堆栈分配物理页。子进程和父进程使用相同的代码段;子进程复制父进程的堆栈段和数据段。 //写时复制:一般CPU都是以"页"为单位来分配内存空间的,每一个页都是实际物理内存的一个映像,象INTEL的CPU,其一页在通常情况下是4086字节大小,而无论是数据段还是堆栈段都是由许多"页"构成的,fork函数复制这两个段,只是"逻辑"上的,并非"物理"上的,也就是说,实际执行fork时,物理空间上两个进程的数据段和堆栈段都还是共享着的,当有一个进程写了某个数据时,这时两个进程之间的数据才有了区别,系统就将有区别的"页"从物理上也分开。 //子进程一旦开始运行,子进程和父进程之间就已经不再共享任何数据了。它们再要交互信息时,只有通过进程间通信来实现。 pid=fork(); /*产生子进程*/ //fork产生子进程的表现就是它会返回2次 //一次返回0,顺序执行下面的代码。这是子进程。 //一次返回子进程的pid,也顺序执行下面的代码,这是父进程。 int i; if (pid


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有