云计算百科
云计算领域专业知识百科平台

linux网络编程 (3) 什么是reactor?如何实现服务器百万并发

文章目录

  • 一、什么是reactor?
  • 二、reactor的基本概念
  • 三、reactor的工作流程
  • 四、通过epoll实现一个reactor
    • 1.设计连接池
    • 2、事件源关联事件
    • 3、定义回调函数
    • 4、设置回调函数
    • 5 、主循环
  • 五、服务器百万并发
      • 1.socket:Too many open files 操作系统限制文件打开数量
      • 2.修改 vim /etc/security/limits.conf
      • 3.系统资源限制
      • 4.内核模块未加载

一、什么是reactor?

reactor是一种事件驱动编程手段,常用于高并发的网络I/O编程中。它用于处理大量的客户端请求,根据I/O上触发的不同事件,执行不同的回调函数。

二、reactor的基本概念

事件源:指可以触发事件的一些实体,比如网络I/O套接字,当套接字状态发生变化,比如有数据可读、有数据可写时,会触发一个事件。 事件循环: reactor模式依赖于一个事件循环机制,它会不断的检查事件源上是否有事件发生,当有事件发生后,会把事件分发下去。 事件处理: 事件源上不同的事件,都有自己独立的事件处理函数,处理具体的业务逻辑。

三、reactor的工作流程

1.监听事件:reactor 会监听所有注册的事件源(例如套接字),并等待事件的发生。 2.事件触发: 当某个事件源准备好进行某种操作时(例如网络连接有数据可读),Reactor 就会触发事件。 3.分发事件: Reactor 会将事件分发给合适的事件处理器。每个事件处理器负责处理特定类型的事件,如数据读取、写入等。 4.处理事件: 事件处理器会执行与事件相关的操作,比如读取数据、处理请求、发送响应等。

四、通过epoll实现一个reactor

1.设计连接池

首先我们需要定义一个连接池,定义连接池的目的主要是把每个I/O连接和一组数据关联起来,通过文件描述符就可以很方便的找到和其相关联的数据,连接池定义如下:

typedef struct epollConnInfo
{
int fd;
char wBuffer[BUFFER_LENGTH];
int wLength;
char rBuffer[BUFFER_LENGTH];
int rLength;

RCALLBACK send_callback;

union
{
RCALLBACK accept_callback;
RCALLBACK recv_callback;
} r_action;

}EPOLLCONNINFO;

连接池中包含了文件描述符,收数据buffer、收数据长度、发数据buffer、发数据长度,以及不同的回调函数。对于不同的fd,不同的事件,会触发不同的回调函数。 我们使用每个fd即作为连接池的下标,方便访问。我们把连接池的大小设置为1048576,方便我们实现服务器百万级并发:

#define CONNECTION_SIZE1048576
struct conn conn_list[CONNECTION_SIZE] = {0};

2、事件源关联事件

事件源即为sockfd,一个网络I/O,分为listenfd和clientfd。事件即为可读或者可写,在epoll中事件源关联事件,即将fd添加到epoll的红黑树上,并设置监听事件,通过epoll进行管理,我们定义如下函数:

int set_event(int fd,int event,int mode)
{
struct epoll_event ev;
ev.events = event; //可读 EPOLLIN 可写 EPOLLOUT
ev.data.fd = fd;
return epoll_ctl(epFd,mode,fd,&ev);
}

其中mode为EPOLL_CTL_ADD、EPOLL_CTL_MOD、EPOLL_CTL_DEL。

3、定义回调函数

需要定义三个回调函数

int accept_cb(int listenFd)
{
//当监听套接字上有可读事件时触发此函数,即有新连接连入
struct sockaddr_in clientSock;
socklen_t len = sizeof(struct sockaddr);
int clientFd = accept(listenFd,(struct sockaddr*)&clientSock,&len);
if(clientFd < 0)
{
printf("error! accept_cb error !listenfd: %d",listenFd);
return clientFd;
}
//客户端套接字和连接池绑定
conn_list[clientFd].fd = clientFd;
conn_list[clientFd].r_action.accept_callback = recv_cb;
conn_list[clientFd].send_callback = send_cb;

memset(conn_list[clientFd].wBuffer,0,BUFFER_LENGTH);
memset(conn_list[clientFd].rBuffer,0,BUFFER_LENGTH);

//客户端套接字加入epoll管理
set_event(clientFd,EPOLLIN,EPOLL_CTL_ADD);
}

int recv_cb(int fd)
{
//客户端套接字可读
memset(conn_list[fd].rBuffer, 0, BUFFER_LENGTH );
int length = recv(fd,conn_list[fd].rBuffer,BUFFER_LENGTH,0);
if(length == 0)
{
//该连接被断开
printf("client disconnect: %d\\n", fd);
close(fd);
epoll_ctl(epFd,EPOLL_CTL_DEL,fd,NULL);
return 0;
}else if (length < 0)
{
printf("count: %d, errno: %d, %s\\n", length, errno, strerror(errno));
close(fd);
epoll_ctl(epFd,EPOLL_CTL_DEL,fd,NULL);
}
memcpy(conn_list[fd].wBuffer, conn_list[fd].rBuffer, length);
conn_list[fd].wLength = length;
set_event(fd,EPOLLOUT,EPOLL_CTL_MOD); //修改客户端fd的epoll事件,关注可写事件
return 0;
}

int send_cb(int fd)
{
//客户端fd可写
printf("send_cb action! clientfd:%d \\n",fd);
send(fd,conn_list[fd].wBuffer,conn_list[fd].wLength,0);
set_event(fd, EPOLLIN, EPOLL_CTL_MOD);
return 0;
}

4、设置回调函数

回调函数在连接池中,我们通过fd即可访问fd关联的连接池。在listenfd和clientfd创建时,设置它们的回调函数。 listenfd:

//把监听套接字和连接池关联
conn_list[listenFd].fd = listenFd;
conn_list[listenFd].r_action.accept_callback = accept_cb;

clientfd:

//客户端套接字和连接池绑定
conn_list[clientFd].fd = clientFd;
conn_list[clientFd].r_action.accept_callback = recv_cb;
conn_list[clientFd].send_callback = send_cb;

memset(conn_list[clientFd].wBuffer,0,BUFFER_LENGTH);
memset(conn_list[clientFd].rBuffer,0,BUFFER_LENGTH);

5 、主循环

while(1)
{
//主循环
struct epoll_event evs[EPOLLEVENT_SIZE] = {0};
int occuredEvents = epoll_wait(epFd,evs,EPOLLEVENT_SIZE,1);//occuredEvents是执行一次epollwait时发生事件的套接字数量

for(int i = 0;i < occuredEvents;i++)
{
//根据fd发生的事件 调用不同的回调函数
if(evs[i].events & EPOLLIN)
{
conn_list[evs[i].data.fd].r_action.accept_callback(evs[i].data.fd);
}
if(evs[i].events & EPOLLOUT)
{
conn_list[evs[i].data.fd].send_callback(evs[i].data.fd);
}
}
}

通过上述5个步骤,我们就通过epoll实现了一个简易的reator,并且可通过该reactor实现百万级并发。

五、服务器百万并发

影响服务器并发量的因素有如下:

1.socket:Too many open files 操作系统限制文件打开数量

通过ulimit -a 查看数量 如上图,这里的数量是65535,可通过ulimit -n 1048576 设置为百万级别。

2.修改 vim /etc/security/limits.conf

在这里插入图片描述 把这个值也设置为 1048576

3.系统资源限制

通常一个tcp连接有如下五元组成 服务器ip 服务器端口 客户端ip 客户端端口 网络协议 我们可以在服务器开放多个端口,也就是创建多个listenfd,这样可以提高服务器的并发数量。

4.内核模块未加载

若出现错误 在这里插入图片描述执行 sudo modprobe ip_conntrack

参考学习

https://github.com/0voice

赞(0)
未经允许不得转载:网硕互联帮助中心 » linux网络编程 (3) 什么是reactor?如何实现服务器百万并发
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!