✨✨欢迎来到T_X_Parallel的博客!! 🛰️博客主页:T_X_Parallel 🛰️项目代码仓库:Reactor模型高并发服务器项目代码仓库(随博客更新) 🛰️专栏 : Reactor模型高并发服务器项目 🛰️欢迎关注:👍点赞🙌收藏✍️留言
文章目录
-
- 1. 前言
- 2. 模块分析
- 3. 模块实现
-
- Channel模块
- Poller模块
- TimerWheel模块
- EventLoop模块
- 4. 模块测试
- 5. 模块代码
-
- Channel模块代码
- Poller模块代码
- TimerWheel模块代码
- EventLoop模块代码
- 事件循环监控完整代码
项目环境:vscode、wsl2(Ubuntu22.04)
技术栈:C/C++、C++11、C++17、STL、HTTP、TCP、HTML
1. 前言
虽然标题中只是一个事件循环监控模块,但是这模块中有四个小模块相互关联起来组成的模块,分别为Channel模块、Poller模块、TimerWheel模块、EventLoop模块,准确来说,EventLoop模块将前三个模块的功能集成起来
这个模块的主要作用就是监控描述符的事件,并根据触发的事件调用指定的函数进行处理,同时当监控监听套接字事件时根据用户需求进行超时连接释放操作,所以模块中使用了回调函数、I/O多路复用、时间轮等
2. 模块分析
上面说了这个模块主要由四个子模块组成,那么先来一一介绍这四个模块的作用以及需要实现的功能,然后再分析四个模块之间的联系,以便于下面实现时能够理解实现的过程的原因
该模块就是存放描述符触发事件的事件处理回调函数的模块,所以需要指定需要监控哪个描述符和监控什么事件以及当前触发的事件,然后去选择使用对应的事件处理函数进行处理。一个描述符的事件大致可分为读事件、写事件、错误事件、关闭事件,所以需要实现能设置监听什么事件以及当前触发事件的功能,还有需要实现设置这些事件触发后的事件处理回调函数的功能,最后就是实现一个调用对应事件处理回调函数的功能
事件处理函数中可以多实现一个任意事件触发后都执行的回调函数,因为启动超时释放功能后,连接的任意事件都应该去刷新时间轮的定时任务,添加这个任意事件处理回调函数就是为了超时释放刷新实现的
当然不止这些元素,等四个模块的关系梳理清楚后就会知道还要添加,下面的三个子模块也是同样道理,现在只是实现了子模块自己该有的元素
这个模块主要是对高级I/O多路复用的接口进行封装,因为是高性能并发服务器,所以这里面使用epoll来实现多路复用监控描述符的各种事件的触发
这个模块中肯定有的成员是epollfd描述符,然后就是一个存储监听描述符触发什么事件的数组(为了节省空间,就不在监控功能接口中创建该数组),最后就是一个该epoll中监控的所有描述符以及对应的Channel对象,为了方便查找使用哈希表存储这个对应关系,使用描述符fd即可找到对应的Channel对象
该模块所要实现的功能就清晰了,就是添加或更新监控、移除监控、以及监控事件并设置对应Channel中的当前触发事件
该模块的设计以及实现在该项目博客中的第二篇项目准备中已经完成,但是这里需要添加一个东西让这个时间轮能够自动转动而不需要依靠外部,这里可以添加最初实现定时任务的方案用到的Linux提供的计时器(这两个设计思想在01_项目准备博客中详细讲解过,如有细节遗忘,请跳转至该篇博客),自动转动的实现也借助了上面的两个模块实现的,这部分内容在下面的关联讲解中会详细阐述
这个模块实现的功能与之前实现的大致相同,只有在轮转指针的移动功能中由于计时器的加入而有区别,具体在模块实现中会阐述
该模块是该项目的核心之一,该模块也可以说是之前说的Reactor模型,所以这个模块肯定是在多个线程中有多个对象,如果出现错误的对象分配那么就会产生线程安全问题,如果使用锁来处理,那么会阻塞导致性能瓶颈,这里的处理思想就是只要任务(这里的任务不是指事件处理,而是指一些比如定时任务添加刷新等涉及线程安全问题的任务)的执行只要在同一个线程中执行即可,那么可以将这些任务这里可以给每个EventLoop对象分配一个线程,即一个线程一个Eventloop对象,对象中存有对应的线程ID,只有线程对应上了就直接执行任务,不然就将任务加入到任务队列中,而只要确保这个任务队列的执行只在对应线程中即可解决线程安全问题,也就是说其他线程中的对象只能将任务加入到任务队列中,而该对象对应的线程可以直接执行任务以及任务队列中的任务
该模块中主要的成员肯定包括上面的三个模块的对象,还有就是对应的线程ID、任务队列以及Linux提供的事件描述符及对应的Channel对象,这个事件描述符是用来避免当将任务加入任务队列时其他没有事件触发就会阻塞在epoll等待位置,只需在加入任务后向该描述符发送数据,而这个描述符添加了可读事件监控,那么就会触发事件来唤醒进程从而去执行任务队列中的任务,能够使任务及时处理
该模块要实现的功能就是对定时任务的添加、刷新、删除以及查找的功能、在Poller中添加或更新、删除事件监控功能、添加任务进EventLoop中的功能、循环监控事件执行对应描述符触发事件的事件处理回调函数并执行任务队列中任务的功能,可以看出这些功能都关联了上面的模块
根据上面EventLoop模块的设计思想中不难看出eventloop模块将前三个模块进行关联起来了,可以说前三个模块都服务与EventLoop模块,也可以说前三个模块使EventLoop模块的子模块
模块间的关系博主也不多说,只要仔细研究上面的模块设计就可以理解其中的关系,下面给出一张关系图就明了了
注:由于这四个模块互相有其他模块的对象指针或对象,而且调用了成员函数,所以不能分别放在不同的头文件中,而需要放在同一个文件中来实现,不然会出编译冲突
结合上面几个模块的成员图来看这张关系图,基本上就能理清楚中间的关联了
3. 模块实现
Channel模块
Channel模块主要的设计理念上面也已经阐述过了,模块中的成员和需要实现的功能大致也提及了,下面就是对接口以及一些重要部分的详细阐述
模块接口
对外接口
- 获取描述符fd接口
- 获取当前监控的事件接口
- 设置当前监控的描述符所触发的事件接口
- 设置可读、可写、错误、关闭、任意事件处理回调函数接口
- 判断是否监控了可读事件、可写事件接口
- 启动可读事件、可读事件监控接口
- 关闭可读事件、可写事件、所有事件监控
- 添加或更新、移除Poller中的监控接口
- 执行对应事件处理回调函数接口
// 获取描述符fd
inline int GetFd();
// 获取当前监控的事件
inline uint32_t GetEvent();
// 设置当前监控的描述符所触发的事件
void SetEvent(const uint32_t event);
// 设置可读事件处理回调函数
void SetReadCallBack(const EventCallBack &cb);
// 设置可写事件处理回调函数
void SetWriteCallBack(const EventCallBack &cb);
// 设置错误事件处理回调函数
void SetErrorCallBack(const EventCallBack &cb);
// 设置关闭事件处理回调函数
void SetCloseCallBack(const EventCallBack &cb);
// 设置任意事件处理回调函数
void SetAnyEventCallBack(const EventCallBack &cb);
// 判断是否监控了可读事件
bool IsReadEvent();
// 判断是否监控了可写事件
bool IsWriteEvent();
// 启动可读事件监控
void EnableRead();
// 启动可写事件监控
void EnableWrite();
// 关闭可读事件监控
void DisableRead();
// 关闭可写事件监控
void DisableWrite();
// 关闭所有事件监控
void DisableEvent();
// 添加或更新Poller中的监控
void Update();
// 移除Poller中的监控
void Remove();
// 执行事件处理回调函数
void HandleEvent();
事件状态码了解
常见事件状态码及其含义
- 值: 0x001
- 含义: 表示文件描述符上有可读事件(例如,套接字接收到数据)。
- 值: 0x004
- 含义: 表示文件描述符上有可写事件(例如,可以向套接字发送数据)。
- 值: 0x002
- 含义: 表示文件描述符上有紧急数据可读(例如,带外数据)。
- 值: 0x008
- 含义: 表示文件描述符发生错误。
- 值: 0x010
- 含义: 表示文件描述符被挂起或对端关闭连接。
- 值: 0x2000
- 含义: 表示对端关闭连接或半关闭(常用于检测对端关闭的场景)。
- 值: 0x80000000
- 含义: 表示使用边缘触发模式(Edge Triggered),需要一次性处理所有事件。
- 值: 0x40000000
- 含义: 表示一次性事件,事件触发后需要重新注册。
代码中使用的状态码
在代码中,以下状态码被使用:
- EPOLLIN: 用于检测可读事件。
- EPOLLOUT: 用于检测可写事件。
- EPOLLPRI: 用于检测紧急数据。
- EPOLLERR: 用于检测错误事件。
- EPOLLHUP: 用于检测挂起或关闭事件。
- EPOLLRDHUP: 用于检测对端关闭连接。
模块实现细节
由于需要添加、更新、移除epoll中的事件监控,按道理类中需要一个Poller对象,但是上面提及过EventLoop需要的功能,EventLoop中实现了在Poller中添加或更新、删除事件监控功能,所以只需要一个EventLoop对象即可实现该功能,但是EventLoop是主模块,那么应该使用EventLoop指针,在每个Channel类实例化时传入一个EventLoop对象指针即可。
在实现获取监控事件以及启动和关闭事件监控功能中可以使用位运算来实现,还有执行任意事件处理回调函数的位置应该在每个事件事件之前执行,因为这些事件处理函数执行完会使连接断开且释放,导致执行任意事件处理回调函数失败而导致程序崩溃
还有添加、更新、移除epoll中的事件监控接口的实现需要等EventLoop模块实现完再实现,实现在EventLoop模块后面实现,类中声明,类外实现,否则会出现找不到函数的编译错误,因为这些接口使用了EventLoop中的接口来实现的,下面TimerWheel模块中的一些接口是同样的道理
由于在Epoll中除了可读和可写事件,错误事件和关闭事件的触发是无条件的,即epoll不设置也可以监控错误事件和关闭事件,所以在功能中不需要设计启动关闭错误事件以及关闭事件监控的接口
具体的实现代码请看文章结尾的模块代码部分
Poller模块
这个模块实现主要就是对epoll接口操作的封装,只要了解接口的使用即可,有些实现细节博主会在下面提及
模块接口
对外接口
- 添加或更新事件监控接口
- 删除事件监控接口
- 获取触发事件接口
// 更新或添加事件监控
void Update(Channel *ch);
// 移除事件监控
void Remove(Channel *ch);
// 获取触发事件
void Poll(std::vector<Channel *> &actives);
内部接口
- 判断在监控事件中是否存在对应的Channel对象接口
- 修改事件监控接口
// 判断监听事件中是否存在对应的Channel
bool ExistChannel(const int fd);
// 修改epoll监控的事件
void _Update(Channel *ch, int op);
epoll接口了解
- int epoll_create(int size);
参数
- size
- 这是一个历史遗留参数,表示建议的文件描述符数量。
- 在早期的 Linux 内核中,这个参数用于指定 epoll 实例中可以监控的文件描述符的最大数量。
- 现代 Linux 内核(2.6.8 及之后)中,该参数已被忽略,但仍然需要传递一个大于 0 的值,否则会返回错误。
返回值
- 成功
- 返回一个文件描述符(epoll 实例的标识符),可以用于后续的 epoll_ctl 和 epoll_wait 操作。
- 失败
- 返回-1,并设置errno来指示错误原因。常见的错误包括:
- EINVAL: 参数 size 小于等于 0。
- ENOMEM: 系统内存不足,无法分配资源。
- EMFILE: 进程已达到文件描述符的上限。
- 返回-1,并设置errno来指示错误原因。常见的错误包括:
- size
- int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
参数
-
epfd:
- 由 epoll_create 或 epoll_create1 返回的 epoll 实例的文件描述符。
- 用于标识当前的 epoll 实例。
-
events:
-
指向一个 epoll_event 数组的指针,用于存储触发的事件。
-
调用成功后,该数组会被填充为已触发的事件信息。
-
每个epoll_event结构包含以下字段:
struct epoll_event {
uint32_t events; // 事件类型,例如 EPOLLIN, EPOLLOUT 等
epoll_data_t data; // 用户数据,可以是文件描述符或指针
};
-
-
maxevents:
- 指定 events 数组的大小,即最多可以返回的事件数量。
- 必须大于 0,否则会返回错误。
-
timeout:
- 指定等待事件发生的超时时间(以毫秒为单位)。
- 取值范围:
- timeout > 0: 等待指定的毫秒数。
- timeout == 0: 不等待,立即返回。
- timeout == -1: 无限期等待,直到有事件发生。
- 成功:
- 返回触发的事件数量(即 events 数组中填充的事件个数)。
- 如果返回值为 0,表示在指定的超时时间内没有任何事件发生。
- 失败:
- 返回-1,并设置errno来指示错误原因。常见错误包括:
- EBADF: 参数 epfd 不是有效的文件描述符。
- EFAULT: 参数 events 指向的内存不可访问。
- EINVAL: 参数 epfd 无效,或 maxevents 小于等于 0。
- EINTR: 调用被信号中断。
- 返回-1,并设置errno来指示错误原因。常见错误包括:
返回值
- int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数
-
epfd:
- 由 epoll_create 或 epoll_create1 返回的 epoll 实例的文件描述符。
- 用于标识当前的 epoll 实例。
-
op:
- 指定要执行的操作类型,取值如下:
- EPOLL_CTL_ADD: 将文件描述符 fd 添加到 epoll 实例中。
- EPOLL_CTL_MOD: 修改已注册的文件描述符 fd 的事件。
- EPOLL_CTL_DEL: 从 epoll 实例中删除文件描述符 fd。
- 指定要执行的操作类型,取值如下:
-
fd:
- 需要操作的目标文件描述符(例如套接字、管道等)。
- 必须是一个有效的文件描述符。
-
event:
-
指向一个 epoll_event 结构体,用于描述要监听的事件类型和关联的数据。
-
仅在 op 为 EPOLL_CTL_ADD 或 EPOLL_CTL_MOD 时需要,EPOLL_CTL_DEL 时可以为 NULL。
-
epoll_event结构体定义如下:
struct epoll_event {
uint32_t events; // 事件类型,例如 EPOLLIN, EPOLLOUT 等
epoll_data_t data; // 用户数据,可以是文件描述符或指针
}; -
常见的事件类型:
- EPOLLIN: 可读事件。
- EPOLLOUT: 可写事件。
- EPOLLERR: 错误事件。
- EPOLLHUP: 挂起事件。
- EPOLLET: 边缘触发模式。
- EPOLLONESHOT: 一次性事件。
-
- 成功:
- 返回 0,表示操作成功。
- 失败:
- 返回-1,并设置errno来指示错误原因。常见错误包括:
- EBADF: 参数 epfd 或 fd 不是有效的文件描述符。
- EEXIST: 使用 EPOLL_CTL_ADD 时,文件描述符 fd 已经存在。
- EINVAL: 参数无效,例如 epfd 不是 epoll 文件描述符,或 op 不合法。
- ENOENT: 使用 EPOLL_CTL_MOD 或 EPOLL_CTL_DEL 时,文件描述符 fd 不存在。
- ENOMEM: 系统内存不足,无法完成操作。
- 返回-1,并设置errno来指示错误原因。常见错误包括:
返回值
模块实现细节
该模块只是对系统接口的封装没有多少实现细节,主要就是要判断这几个系统接口处理的返回值是否正常,不正常打印出日志,到时候出错可以查看哪里出错进行修正
唯一需要注意的细节是Poller中描述符与Channel对应的哈希表中存储的是对应Channel对象的指针,这样在监听到事件触发后可以直接修改对应Channel中的当前触发事件成员
具体的实现代码请看文章结尾的模块代码部分
TimerWheel模块
这个模块的设计以及实现在项目准备部分已经详细阐述过了,同时Linux定时器的参数和返回值以及使用方法已经展示过了,就不重复了,下面就展示一些新增接口以及一些实现细节
新增接口
对外接口
- 运转时间轮接口
内部接口
- 从定时器描述符中读取超时了多少次接口
模块实现细节
首先就是对增添、刷新、删除定时任务的接口进行封装,封装时需要将该任务放入EventLoop中的任务队列中,因为存在线程安全问题
其次是判断某定时任务是否存在存在线程安全问题,由于该函数具有返回值无法放入任务队列,可以直接加锁解决,也可以在整体实现上让该函数只在对应的EventLoop线程中执行即可
然后就是新增接口,一个是从定时器中读取超时次数,定时器是内核在每次超时会让定时器中超时次数加一,然后读取后超时次数重置,那么只需设置定时器超时时间为一秒,然后让Poller监控定时器描述符的可读事件监控,然后将运转时间轮接口设置成定时器描述符对应的Channel中的可读事件处理回调函数,当超时时内核向描述符中写数据就会触发描述符的可读事件,就会运转时间轮,而运转时间轮前先读出超时次数(因为可能会因为其他原因阻塞导致不能第一时间读出超时次数,后面在程序调试过程中需要注意这个问题,可能在调试的环境下有些会出错,因为调试的时候定时器是正常运作的,该定时器的原理就是基于Linux开机时间作为相对时间计算的超时次数)超时多少次就将指针向后移动多少步
最后对定时任务的操作封装肯定要调用EventLoop中的接口,那么像Channel模块一样成员中加入一个EventLoop对象指针,同样,函数目前是在类内声明,实现需要放到EventLoop模块后面进行实现
具体的实现代码请看文章结尾的模块代码部分
EventLoop模块
该模块主要就是对上面模块的整合,基本实现思想并不复杂,只要理清这几个模块之间的关系即可
模块接口
对外接口
- 将任务放入任务队列接口
- 对判断是否处在对应线程操作进行断言接口
- 开始事件轮询监控执行接口
- 执行任务接口
- 更新、删除Poller中的事件监控接口
- 添加、刷新、删除定时任务接口
- 判断定时任务是否存在接口
// 将任务放入任务队列中
void InsertTaskPool(const Function &cb);
// 对判断对应线程操作进行断言
void AssertInLoop();
// 开始事件轮询监控执行
void Start();
// 执行任务,如在对应线程直接执行,不在则放入任务队列中
void RunInLoop(const Function &cb);
// 更新Poller中的事件监控
void UpdateChannel(ns_eventloop::Channel *ch);
// 移除Poller中的事件监控
void RemoveChannel(ns_eventloop::Channel *ch);
//添加定时任务
void AddTimerTask(uint64_t id, uint64_t timeout, const Function &cb);
// 刷新定时任务
void RefreshTimerTask(uint64_t id);
// 取消定时任务
void CancelTimerTask(uint64_t id);
// 判断定时任务是否存在
bool ExistTimerTask(uint64_t id);
内部接口
- 执行任务队列中的任务接口
- 创建事件描述符fd接口
- 读取事件描述符的数据
- 向事件描述符中写入数据
- 判断当前是否处于对应线程接口
// 执行任务队列中的任务
void RunAllTask();
// 创建事件描述符fd
static int CreateEventFd();
// 读取事件描述符的数据
void ReadEventFd();
// 向事件描述符中写入数据
void WeakUpEventFd();
// 判断当前是否处于对应线程
bool IsInLoop();
模块实现细节
首先就是和上面时间轮一样的操作,将事件描述符的监控添加进Poller中并设置指定Channel对象的可读事件处理回调函数为读取事件描述符的数据接口,然后启动Channel中的可读事件监控,每次将任务放入任务队列后就会唤醒Poller中等待事件就绪的阻塞,然后就会立即执行任务队列中的任务
其次就是在执行任务队列中的任务时可以使用一个临时对象来存放从任务队列中取出的任务,当然这个过程需要加锁,避免线程安全问题,取出之后直接逐一实现即可
然后获取当前线程可以使用std::this_thread::get_id()接口
最后就是开始事件轮询监控执行应该使用一个while(1)实现事件监控并执行的轮询
具体的实现代码请看文章结尾的模块代码部分
4. 模块测试
测试代码
// 服务端
#include "../code/socket.hpp"
#include "../code/log.hpp"
#include "../code/eventloop.hpp"
void HandleClose(ns_eventloop::Channel *ch)
{
ch->Remove();
LOG(NORMAL, "Close Fd: " + std::to_string(ch->GetFd()));
delete ch;
}
void HandleRead(ns_eventloop::Channel *ch)
{
int fd = ch->GetFd();
char buf[1024] = {0};
int ret = recv(fd, buf, 1023, 0);
if (ret <= 0)
{
HandleClose(ch);
return;
}
LOG(NORMAL, buf);
ch->EnableWrite();
}
void HandleWrite(ns_eventloop::Channel *ch)
{
int fd = ch->GetFd();
const char *s = "xiaomi su7max";
int ret = send(fd, s, strlen(s), 0);
if (ret < 0)
{
HandleClose(ch);
return;
}
ch->DisableWrite();
}
void HandleError(ns_eventloop::Channel *ch)
{
HandleClose(ch);
}
void HandleAnyEvent(ns_eventloop::EventLoop *loop, uint64_t id, ns_eventloop::Channel *ch)
{
loop->RefreshTimerTask(id);
}
void Accept(ns_eventloop::EventLoop *loop, ns_eventloop::Channel *ls_ch)
{
int fd = ls_ch->GetFd();
int newfd = accept(fd, NULL, NULL);
if (newfd < 0)
{
LOG(DEBUG, "accept failed");
return;
}
uint64_t id = fd;
ns_eventloop::Channel *newch = new ns_eventloop::Channel(newfd, loop);
newch->SetReadCallBack(std::bind(HandleRead, newch));
newch->SetWriteCallBack(std::bind(HandleWrite, newch));
newch->SetErrorCallBack(std::bind(HandleError, newch));
newch->SetCloseCallBack(std::bind(HandleClose, newch));
newch->SetAnyEventCallBack(std::bind(HandleAnyEvent, loop, id, newch));
loop->AddTimerTask(id, 10, std::bind(HandleClose, newch));
newch->EnableRead();
}
int main()
{
ns_eventloop::EventLoop loop;
ns_socket::Socket svr_socket;
svr_socket.CreateServer(8081);
ns_eventloop::Channel channel(svr_socket.GetFd(), &loop);
channel.SetReadCallBack(std::bind(&Accept, &loop, &channel));
channel.EnableRead();
loop.Start();
svr_socket.Close();
return 0;
}
// 客户端
#include "../code/socket.hpp"
#include <unistd.h>
int main()
{
ns_socket::Socket cli_socket;
cli_socket.CreateClient(8081, "127.0.0.1");
for (int i = 0; i < 5; i++)
{
std::string str = "xiaomi su7ultra";
cli_socket.Send(str.c_str(), str.size());
char buf[1024] = {0};
cli_socket.Recv(&buf, 1023);
LOG(NORMAL, buf);
sleep(1);
}
while(1);
return 0;
}
上面的测试代码测试了事件监控以及超时连接释放等功能
测试结果
./Server
[Level# NORMAL][time# 2025–04–04 21:34:30][File# test_eventloop_server.cc][Line# 27]Message# xiaomi su7ultra
[Level# NORMAL][time# 2025–04–04 21:34:31][File# test_eventloop_server.cc][Line# 27]Message# xiaomi su7ultra
[Level# NORMAL][time# 2025–04–04 21:34:32][File# test_eventloop_server.cc][Line# 27]Message# xiaomi su7ultra
[Level# NORMAL][time# 2025–04–04 21:34:33][File# test_eventloop_server.cc][Line# 27]Message# xiaomi su7ultra
[Level# NORMAL][time# 2025–04–04 21:34:34][File# test_eventloop_server.cc][Line# 27]Message# xiaomi su7ultra
[Level# NORMAL][time# 2025–04–04 21:34:44][File# test_eventloop_server.cc][Line# 10]Message# Close Fd: 7
./Client
[Level# NORMAL][time# 2025–04–04 21:34:30][File# test_eventloop_client.cc][Line# 29]Message# xiaomi su7max
[Level# NORMAL][time# 2025–04–04 21:34:31][File# test_eventloop_client.cc][Line# 29]Message# xiaomi su7max
[Level# NORMAL][time# 2025–04–04 21:34:32][File# test_eventloop_client.cc][Line# 29]Message# xiaomi su7max
[Level# NORMAL][time# 2025–04–04 21:34:33][File# test_eventloop_client.cc][Line# 29]Message# xiaomi su7max
[Level# NORMAL][time# 2025–04–04 21:34:34][File# test_eventloop_client.cc][Line# 29]Message# xiaomi su7max
测试结果来看都正常,10秒也能正常连接释放
注:如果程序出现任何错误,第一时间先看代码是否出现一眼就能看到的错误,然后再调试找细小错误,如果实在找不出,将问题以及出错位置发到评论区,博主会进行帮助的
5. 模块代码
Channel模块代码
class EventLoop;
typedef std::function<void()> EventCallBack;
class Channel
{
private:
int _fd;
// ns_poller::Poller *_poller;
EventLoop *_loop;
uint32_t _event; // 监听事件
uint32_t _revent; // 当前就绪事件
EventCallBack _read_callback; // 读事件触发回调函数
EventCallBack _write_callback; // 写事件触发回调函数
EventCallBack _error_callback; // 错误事件触发回调函数
EventCallBack _close_callback; // 连接关闭事件触发回调函数
EventCallBack _anyevent_callback; // 任意事件触发回调函数
public:
Channel(int fd, EventLoop *loop)
: _fd(fd), _event(0), _revent(0), _loop(loop)
{
}
// 获取描述符fd
inline int GetFd()
{
return _fd;
}
// 获取当前监控的事件
inline uint32_t GetEvent()
{
return _event;
}
// 设置当前监控的描述符所触发的事件
void SetEvent(const uint32_t event)
{
_revent = event;
}
// 设置可读事件处理回调函数
void SetReadCallBack(const EventCallBack &cb)
{
_read_callback = cb;
}
// 设置可写事件处理回调函数
void SetWriteCallBack(const EventCallBack &cb)
{
_write_callback = cb;
}
// 设置错误事件处理回调函数
void SetErrorCallBack(const EventCallBack &cb)
{
_error_callback = cb;
}
// 设置关闭事件处理回调函数
void SetCloseCallBack(const EventCallBack &cb)
{
_close_callback = cb;
}
// 设置任意事件处理回调函数
void SetAnyEventCallBack(const EventCallBack &cb)
{
_anyevent_callback = cb;
}
// 判断是否监控了可读事件
bool IsReadEvent()
{
return (_event & EPOLLIN);
}
// 判断是否监控了可写事件
bool IsWriteEvent()
{
return (_event & EPOLLOUT);
}
// 启动可读事件监控
void EnableRead()
{
_event |= EPOLLIN;
Update();
}
// 启动可写事件监控
void EnableWrite()
{
_event |= EPOLLOUT;
Update();
}
// 关闭可读事件监控
void DisableRead()
{
_event &= ~EPOLLIN;
Update();
}
// 关闭可写事件监控
void DisableWrite()
{
_event &= ~EPOLLOUT;
Update();
}
// 关闭所有事件监控
void DisableEvent()
{
_event = 0;
Update();
}
// 添加或更新Poller中的监控
void Update();
// 移除Poller中的监控
void Remove();
// 执行事件处理回调函数
void HandleEvent()
{
if ((_revent & EPOLLIN) || (_revent & EPOLLRDHUP) || (_revent & EPOLLPRI))
{
if (_anyevent_callback)
_anyevent_callback();
// LOG(DEBUG, "READ");
if (_read_callback)
_read_callback();
}
if (_revent & EPOLLOUT)
{
if (_anyevent_callback)
_anyevent_callback();
// LOG(DEBUG, "Write");
if (_write_callback)
_write_callback();
}
else if (_revent & EPOLLERR)
{
if (_anyevent_callback)
_anyevent_callback();
// LOG(DEBUG, "ERROR");
if (_error_callback)
_error_callback();
}
else if (_revent & EPOLLHUP)
{
if (_anyevent_callback)
_anyevent_callback();
// LOG(DEBUG, "CLOSE");
if (_close_callback)
_close_callback();
}
}
};
//…
//EventLoop模块实现完
void Channel::Update()
{
_loop->UpdateChannel(this);
}
void Channel::Remove()
{
_loop->RemoveChannel(this);
}
Poller模块代码
#define MAX_EPOLLEVENTS 1024
class Poller
{
private:
int _epollfd;
struct epoll_event _evts[MAX_EPOLLEVENTS];
std::unordered_map<int, Channel *> _channels;
public:
Poller()
{
_epollfd = epoll_create(MAX_EPOLLEVENTS);
if (_epollfd < 0)
{
LOG(FATAL, "Epollfd Create Failed");
}
}
// 更新或添加事件监控
void Update(Channel *ch)
{
bool ret = ExistChannel(ch->GetFd());
if (ret == false)
{
_channels[ch->GetFd()] = ch;
_Update(ch, EPOLL_CTL_ADD);
}
else
_Update(ch, EPOLL_CTL_MOD);
}
// 移除事件监控
void Remove(Channel *ch)
{
auto iter = _channels.find(ch->GetFd());
if (iter != _channels.end())
{
_channels.erase(iter);
}
_Update(ch, EPOLL_CTL_DEL);
}
// 获取触发事件
void Poll(std::vector<Channel *> &actives)
{
int nfds = epoll_wait(_epollfd, _evts, MAX_EPOLLEVENTS, –1);
if (nfds < 0)
{
if (errno == EINTR)
return;
LOG(ERROR, "Epoll Wait Failed");
}
for (int i = 0; i < nfds; i++)
{
auto iter = _channels.find(_evts[i].data.fd);
if (iter == _channels.end())
LOG(ERROR, "Fd No Find");
iter->second->SetEvent(_evts[i].events);
actives.push_back(iter->second);
}
}
private:
// 判断监听事件中是否存在对应的Channel
bool ExistChannel(const int fd)
{
auto iter = _channels.find(fd);
if (iter == _channels.end())
return false;
return true;
}
// 修改epoll监控的事件
void _Update(Channel *ch, int op)
{
int fd = ch->GetFd();
struct epoll_event evt;
evt.events = ch->GetEvent();
evt.data.fd = fd;
int ret = epoll_ctl(_epollfd, op, fd, &evt);
if (ret < 0)
{
LOG(ERROR, "Epoll Update Failed");
}
}
};
TimerWheel模块代码
#define MAX_TIMEOUT 60
typedef std::function<void()> Function;
class TimerTask
{
private:
uint64_t _id; // 任务ID
bool _canceled; // 任务状态
int _timeout; // 设置的超时时间
// 定时任务回调函数
Function _taskcallback;
// 将任务完全从时间轮中释放掉的回调函数,就是释放掉哈希表中的weak_ptr
Function _releasecallback;
public:
// 构造函数
TimerTask(uint64_t id, int timeout)
: _id(id), _canceled(false), _timeout(timeout)
{
}
// 析构函数,条件满足再执行定时任务和释放的回调函数
~TimerTask()
{
if (_releasecallback)
_releasecallback();
if (_taskcallback && !_canceled)
_taskcallback();
}
public:
// 设置定时任务回调函数
void SetTaskCallBack(const Function &taskcallback)
{
_taskcallback = taskcallback;
}
// 设置释放的回调函数
void SetReleaseCallBack(const Function &releasecallback)
{
_releasecallback = releasecallback;
}
// 获取定时任务的超时时间
int GetTimeOut()
{
return _timeout;
}
// 取消定时任务
void TaskCancel()
{
_canceled = true;
}
};
typedef std::shared_ptr<TimerTask> TimerTask_SharedPtr;
typedef std::weak_ptr<TimerTask> TimerTask_WeakPtr;
class TimerWheel
{
private:
int _tick; // 指针
int _cap; // 时间轮容量
// 二维数组的时间轮
std::vector<std::vector<TimerTask_SharedPtr>> _clock;
// 存储对象的weak_ptr的哈希表
std::unordered_map<uint64_t, TimerTask_WeakPtr> _timertaskhash;
std::mutex _mtx;
EventLoop *_loop;
int _timerfd;
Channel _timerfd_channel;
public:
// 构造函数
TimerWheel(EventLoop *loop)
: _tick(0), _cap(MAX_TIMEOUT), _clock(_cap), _loop(loop), _timerfd(CreateTimerFd()), _timerfd_channel(_timerfd, _loop)
{
_timerfd_channel.SetReadCallBack(std::bind(&TimerWheel::RunTimerTask, this));
_timerfd_channel.EnableRead();
}
public:
// 运转时间轮
void RunTimerTask()
{
int times = ReadTimerFd();
while (times—)
{
MoveTick();
}
}
// 判断某定时任务是否存在
bool ExistTimerTask(uint64_t id)
{
{
std::unique_lock<std::mutex> _lock(_mtx);
auto iter = _timertaskhash.find(id);
if (iter == _timertaskhash.end())
return false;
return true;
}
}
// 添加定时任务
void AddTimerTask(uint64_t id, uint64_t timeout, const Function &cb);
// 刷新定时任务
void RefreshTimerTask(uint64_t id);
// 取消定时任务
void CancelTimerTask(uint64_t id);
private:
// 从定时器描述符中读取超时了多少次
int ReadTimerFd()
{
uint64_t times = 0;
int ret = read(_timerfd, ×, sizeof(times));
if (ret < 0)
LOG(ERROR, "Timerfd Read Failed");
return times;
}
// 添加定时任务
void _AddTimerTask(uint64_t id, uint64_t timeout, const Function &cb)
{
if (timeout > _cap || timeout < 0)
return;
TimerTask_SharedPtr timertask(new TimerTask(id, timeout));
timertask->SetTaskCallBack(cb);
timertask->SetReleaseCallBack(std::bind(&TimerWheel::DeleteHashKey, this, id));
_clock[(_tick + timeout) % _cap].push_back(timertask);
_timertaskhash[id] = TimerTask_WeakPtr(timertask);
}
// 刷新定时任务
void _RefreshTimerTask(uint64_t id)
{
auto iter = _timertaskhash.find(id);
if (iter == _timertaskhash.end())
return LOG(WARNING, "No Exist " + std::to_string(id) + " Task");
TimerTask_SharedPtr newtimertask = iter->second.lock();
int delay_pos = (newtimertask->GetTimeOut() + _tick) % _cap;
_clock[delay_pos].push_back(newtimertask);
}
// 取消定时任务
void _CancelTimerTask(uint64_t id)
{
auto iter = _timertaskhash.find(id);
if (iter == _timertaskhash.end())
LOG(WARNING, "No Exist " + std::to_string(id) + " Task");
iter->second.lock()->TaskCancel();
}
// 指针移动
void MoveTick()
{
_tick = (_tick + 1) % _cap;
_clock[_tick].clear();
}
static int CreateTimerFd()
{
int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
if (timerfd < 0)
{
LOG(ERROR, "Timerfd Create Failed");
}
struct itimerspec itm;
itm.it_interval.tv_nsec = 0;
itm.it_interval.tv_sec = 1;
itm.it_value.tv_nsec = 0;
itm.it_value.tv_sec = 1;
timerfd_settime(timerfd, 0, &itm, NULL);
return timerfd;
}
// 传给TimerTask做释放回调,删除哈希表中该对象的weak_ptr
void DeleteHashKey(uint64_t id)
{
auto iter = _timertaskhash.find(id);
if (iter != _timertaskhash.end())
_timertaskhash.erase(iter);
}
};
// …
// EventLoop实现
// 添加定时任务
void TimerWheel::AddTimerTask(uint64_t id, uint64_t timeout, const Function &cb)
{
_loop->RunInLoop(std::bind(&TimerWheel::_AddTimerTask, this, id, timeout, cb));
}
// 刷新定时任务
void TimerWheel::RefreshTimerTask(uint64_t id)
{
_loop->RunInLoop(std::bind(&TimerWheel::_RefreshTimerTask, this, id));
}
// 取消定时任务
void TimerWheel::CancelTimerTask(uint64_t id)
{
_loop->RunInLoop(std::bind(&TimerWheel::_CancelTimerTask, this, id));
}
EventLoop模块代码
typedef std::function<void()> Function;
class EventLoop
{
private:
std::thread::id _thread_id;
int _event_fd;
Channel _event_channel;
Poller _poller;
std::vector<Function> _tasks_pool;
std::mutex _mutex;
TimerWheel _timerwheel;
public:
EventLoop()
: _thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(_event_fd, this), _timerwheel(this)
{
_event_channel.SetReadCallBack(std::bind(&EventLoop::ReadEventFd, this));
_event_channel.EnableRead();
}
private:
// 执行任务队列中的任务
void RunAllTask()
{
std::vector<Function> tmp;
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks_pool.swap(tmp);
}
for (auto &f : tmp)
f();
}
// 创建事件描述符fd
static int CreateEventFd()
{
int evfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (evfd < 0)
{
LOG(ERROR, "Create Eventfd Failed");
}
return evfd;
}
// 读取事件描述符的数据
void ReadEventFd()
{
uint64_t res = 0;
int ret = read(_event_fd, &res, sizeof(res));
if (ret < 0)
{
if (errno == EINTR)
LOG(ERROR, "Interrupted By Signal");
else if (errno == EAGAIN)
LOG(ERROR, "No Data Readable");
else
LOG(ERROR, "Read Eventfd Failed");
}
}
// 向事件描述符中写入数据
void WeakUpEventFd()
{
uint64_t val = 1;
int ret = write(_event_fd, &val, sizeof(val));
if (ret < 0)
{
if (errno == EINTR)
LOG(ERROR, "Interrupted By Signal");
else
LOG(ERROR, "Write Eventfd Failed");
}
}
// 判断当前是否处于对应线程
bool IsInLoop()
{
return _thread_id == std::this_thread::get_id();
}
public:
// 将任务放入任务队列中
void InsertTaskPool(const Function &cb)
{
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks_pool.push_back(cb);
}
WeakUpEventFd();
}
// 对判断对应线程操作进行断言
void AssertInLoop()
{
assert(IsInLoop());
}
// 开始事件轮询监控执行
void Start()
{
while (1)
{
std::vector<Channel *> actives;
_poller.Poll(actives);
for (auto &ch : actives)
ch->HandleEvent();
RunAllTask();
}
}
// 执行任务,如在对应线程直接执行,不在则放入任务队列中
void RunInLoop(const Function &cb)
{
if (IsInLoop())
cb();
else
InsertTaskPool(cb);
}
// 更新Poller中的事件监控
void UpdateChannel(ns_eventloop::Channel *ch)
{
_poller.Update(ch);
}
// 移除Poller中的事件监控
void RemoveChannel(ns_eventloop::Channel *ch)
{
_poller.Remove(ch);
}
//添加定时任务
void AddTimerTask(uint64_t id, uint64_t timeout, const Function &cb)
{
_timerwheel.AddTimerTask(id, timeout, cb);
}
// 刷新定时任务
void RefreshTimerTask(uint64_t id)
{
_timerwheel.RefreshTimerTask(id);
}
// 取消定时任务
void CancelTimerTask(uint64_t id)
{
_timerwheel.CancelTimerTask(id);
}
// 判断定时任务是否存在
bool ExistTimerTask(uint64_t id)
{
return _timerwheel.ExistTimerTask(id);
}
};
事件循环监控完整代码
#pragma once
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <memory>
#include <functional>
#include <unordered_map>
#include <cstring>
#include <cassert>
#include <unistd.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include "log.hpp"
namespace ns_eventloop
{
class EventLoop;
typedef std::function<void()> EventCallBack;
class Channel
{
private:
int _fd;
// ns_poller::Poller *_poller;
EventLoop *_loop;
uint32_t _event; // 监听事件
uint32_t _revent; // 当前就绪事件
EventCallBack _read_callback; // 读事件触发回调函数
EventCallBack _write_callback; // 写事件触发回调函数
EventCallBack _error_callback; // 错误事件触发回调函数
EventCallBack _close_callback; // 连接关闭事件触发回调函数
EventCallBack _anyevent_callback; // 任意事件触发回调函数
public:
Channel(int fd, EventLoop *loop)
: _fd(fd), _event(0), _revent(0), _loop(loop)
{
}
// 获取描述符fd
inline int GetFd()
{
return _fd;
}
// 获取当前监控的事件
inline uint32_t GetEvent()
{
return _event;
}
// 设置当前监控的描述符所触发的事件
void SetEvent(const uint32_t event)
{
_revent = event;
}
// 设置可读事件处理回调函数
void SetReadCallBack(const EventCallBack &cb)
{
_read_callback = cb;
}
// 设置可写事件处理回调函数
void SetWriteCallBack(const EventCallBack &cb)
{
_write_callback = cb;
}
// 设置错误事件处理回调函数
void SetErrorCallBack(const EventCallBack &cb)
{
_error_callback = cb;
}
// 设置关闭事件处理回调函数
void SetCloseCallBack(const EventCallBack &cb)
{
_close_callback = cb;
}
// 设置任意事件处理回调函数
void SetAnyEventCallBack(const EventCallBack &cb)
{
_anyevent_callback = cb;
}
// 判断是否监控了可读事件
bool IsReadEvent()
{
return (_event & EPOLLIN);
}
// 判断是否监控了可写事件
bool IsWriteEvent()
{
return (_event & EPOLLOUT);
}
// 启动可读事件监控
void EnableRead()
{
_event |= EPOLLIN;
Update();
}
// 启动可写事件监控
void EnableWrite()
{
_event |= EPOLLOUT;
Update();
}
// 关闭可读事件监控
void DisableRead()
{
_event &= ~EPOLLIN;
Update();
}
// 关闭可写事件监控
void DisableWrite()
{
_event &= ~EPOLLOUT;
Update();
}
// 关闭所有事件监控
void DisableEvent()
{
_event = 0;
Update();
}
// 添加或更新Poller中的监控
void Update();
// 移除Poller中的监控
void Remove();
// 执行事件处理回调函数
void HandleEvent()
{
if ((_revent & EPOLLIN) || (_revent & EPOLLRDHUP) || (_revent & EPOLLPRI))
{
if (_anyevent_callback)
_anyevent_callback();
// LOG(DEBUG, "READ");
if (_read_callback)
_read_callback();
}
if (_revent & EPOLLOUT)
{
if (_anyevent_callback)
_anyevent_callback();
// LOG(DEBUG, "Write");
if (_write_callback)
_write_callback();
}
else if (_revent & EPOLLERR)
{
if (_anyevent_callback)
_anyevent_callback();
// LOG(DEBUG, "ERROR");
if (_error_callback)
_error_callback();
}
else if (_revent & EPOLLHUP)
{
if (_anyevent_callback)
_anyevent_callback();
// LOG(DEBUG, "CLOSE");
if (_close_callback)
_close_callback();
}
}
};
#define MAX_EPOLLEVENTS 1024
class Poller
{
private:
int _epollfd;
struct epoll_event _evts[MAX_EPOLLEVENTS];
std::unordered_map<int, Channel *> _channels;
public:
Poller()
{
_epollfd = epoll_create(MAX_EPOLLEVENTS);
if (_epollfd < 0)
{
LOG(FATAL, "Epollfd Create Failed");
}
}
// 更新或添加事件监控
void Update(Channel *ch)
{
bool ret = ExistChannel(ch->GetFd());
if (ret == false)
{
_channels[ch->GetFd()] = ch;
_Update(ch, EPOLL_CTL_ADD);
}
else
_Update(ch, EPOLL_CTL_MOD);
}
// 移除事件监控
void Remove(Channel *ch)
{
auto iter = _channels.find(ch->GetFd());
if (iter != _channels.end())
{
_channels.erase(iter);
}
_Update(ch, EPOLL_CTL_DEL);
}
// 获取触发事件
void Poll(std::vector<Channel *> &actives)
{
int nfds = epoll_wait(_epollfd, _evts, MAX_EPOLLEVENTS, –1);
if (nfds < 0)
{
if (errno == EINTR)
return;
LOG(ERROR, "Epoll Wait Failed");
}
for (int i = 0; i < nfds; i++)
{
auto iter = _channels.find(_evts[i].data.fd);
if (iter == _channels.end())
LOG(ERROR, "Fd No Find");
iter->second->SetEvent(_evts[i].events);
actives.push_back(iter->second);
}
}
private:
// 判断监听事件中是否存在对应的Channel
bool ExistChannel(const int fd)
{
auto iter = _channels.find(fd);
if (iter == _channels.end())
return false;
return true;
}
// 修改epoll监控的事件
void _Update(Channel *ch, int op)
{
int fd = ch->GetFd();
struct epoll_event evt;
evt.events = ch->GetEvent();
evt.data.fd = fd;
int ret = epoll_ctl(_epollfd, op, fd, &evt);
if (ret < 0)
{
LOG(ERROR, "Epoll Update Failed");
}
}
};
#define MAX_TIMEOUT 60
typedef std::function<void()> Function;
class TimerTask
{
private:
uint64_t _id; // 任务ID
bool _canceled; // 任务状态
int _timeout; // 设置的超时时间
// 定时任务回调函数
Function _taskcallback;
// 将任务完全从时间轮中释放掉的回调函数,就是释放掉哈希表中的weak_ptr
Function _releasecallback;
public:
// 构造函数
TimerTask(uint64_t id, int timeout)
: _id(id), _canceled(false), _timeout(timeout)
{
}
// 析构函数,条件满足再执行定时任务和释放的回调函数
~TimerTask()
{
if (_releasecallback)
_releasecallback();
if (_taskcallback && !_canceled)
_taskcallback();
}
public:
// 设置定时任务回调函数
void SetTaskCallBack(const Function &taskcallback)
{
_taskcallback = taskcallback;
}
// 设置释放的回调函数
void SetReleaseCallBack(const Function &releasecallback)
{
_releasecallback = releasecallback;
}
// 获取定时任务的超时时间
int GetTimeOut()
{
return _timeout;
}
// 取消定时任务
void TaskCancel()
{
_canceled = true;
}
};
typedef std::shared_ptr<TimerTask> TimerTask_SharedPtr;
typedef std::weak_ptr<TimerTask> TimerTask_WeakPtr;
class TimerWheel
{
private:
int _tick; // 指针
int _cap; // 时间轮容量
// 二维数组的时间轮
std::vector<std::vector<TimerTask_SharedPtr>> _clock;
// 存储对象的weak_ptr的哈希表
std::unordered_map<uint64_t, TimerTask_WeakPtr> _timertaskhash;
EventLoop *_loop;
int _timerfd;
Channel _timerfd_channel;
public:
// 构造函数
TimerWheel(EventLoop *loop)
: _tick(0), _cap(MAX_TIMEOUT), _clock(_cap), _loop(loop), _timerfd(CreateTimerFd()), _timerfd_channel(_timerfd, _loop)
{
_timerfd_channel.SetReadCallBack(std::bind(&TimerWheel::RunTimerTask, this));
_timerfd_channel.EnableRead();
}
public:
// 运转时间轮
void RunTimerTask()
{
int times = ReadTimerFd();
while (times—)
{
MoveTick();
}
}
// 判断某定时任务是否存在
bool ExistTimerTask(uint64_t id)
{
auto iter = _timertaskhash.find(id);
if (iter == _timertaskhash.end())
return false;
return true;
}
// 添加定时任务
void AddTimerTask(uint64_t id, uint64_t timeout, const Function &cb);
// 刷新定时任务
void RefreshTimerTask(uint64_t id);
// 取消定时任务
void CancelTimerTask(uint64_t id);
private:
// 从定时器描述符中读取超时了多少次
int ReadTimerFd()
{
uint64_t times = 0;
int ret = read(_timerfd, ×, sizeof(times));
if (ret < 0)
LOG(ERROR, "Timerfd Read Failed");
return times;
}
// 添加定时任务
void _AddTimerTask(uint64_t id, uint64_t timeout, const Function &cb)
{
if (timeout > _cap || timeout < 0)
return;
TimerTask_SharedPtr timertask(new TimerTask(id, timeout));
timertask->SetTaskCallBack(cb);
timertask->SetReleaseCallBack(std::bind(&TimerWheel::DeleteHashKey, this, id));
_clock[(_tick + timeout) % _cap].push_back(timertask);
_timertaskhash[id] = TimerTask_WeakPtr(timertask);
}
// 刷新定时任务
void _RefreshTimerTask(uint64_t id)
{
auto iter = _timertaskhash.find(id);
if (iter == _timertaskhash.end())
return LOG(WARNING, "No Exist " + std::to_string(id) + " Task");
TimerTask_SharedPtr newtimertask = iter->second.lock();
int delay_pos = (newtimertask->GetTimeOut() + _tick) % _cap;
_clock[delay_pos].push_back(newtimertask);
}
// 取消定时任务
void _CancelTimerTask(uint64_t id)
{
auto iter = _timertaskhash.find(id);
if (iter == _timertaskhash.end())
LOG(WARNING, "No Exist " + std::to_string(id) + " Task");
iter->second.lock()->TaskCancel();
}
// 指针移动
void MoveTick()
{
_tick = (_tick + 1) % _cap;
_clock[_tick].clear();
}
static int CreateTimerFd()
{
int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
if (timerfd < 0)
{
LOG(ERROR, "Timerfd Create Failed");
}
struct itimerspec itm;
itm.it_interval.tv_nsec = 0;
itm.it_interval.tv_sec = 1;
itm.it_value.tv_nsec = 0;
itm.it_value.tv_sec = 1;
timerfd_settime(timerfd, 0, &itm, NULL);
return timerfd;
}
// 传给TimerTask做释放回调,删除哈希表中该对象的weak_ptr
void DeleteHashKey(uint64_t id)
{
auto iter = _timertaskhash.find(id);
if (iter != _timertaskhash.end())
_timertaskhash.erase(iter);
}
};
typedef std::function<void()> Function;
class EventLoop
{
private:
std::thread::id _thread_id;
int _event_fd;
Channel _event_channel;
Poller _poller;
std::vector<Function> _tasks_pool;
std::mutex _mutex;
TimerWheel _timerwheel;
public:
EventLoop()
: _thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(_event_fd, this), _timerwheel(this)
{
_event_channel.SetReadCallBack(std::bind(&EventLoop::ReadEventFd, this));
_event_channel.EnableRead();
}
private:
// 执行任务队列中的任务
void RunAllTask()
{
std::vector<Function> tmp;
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks_pool.swap(tmp);
}
for (auto &f : tmp)
f();
}
// 创建事件描述符fd
static int CreateEventFd()
{
int evfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (evfd < 0)
{
LOG(ERROR, "Create Eventfd Failed");
}
return evfd;
}
// 读取事件描述符的数据
void ReadEventFd()
{
uint64_t res = 0;
int ret = read(_event_fd, &res, sizeof(res));
if (ret < 0)
{
if (errno == EINTR)
LOG(ERROR, "Interrupted By Signal");
else if (errno == EAGAIN)
LOG(ERROR, "No Data Readable");
else
LOG(ERROR, "Read Eventfd Failed");
}
}
// 向事件描述符中写入数据
void WeakUpEventFd()
{
uint64_t val = 1;
int ret = write(_event_fd, &val, sizeof(val));
if (ret < 0)
{
if (errno == EINTR)
LOG(ERROR, "Interrupted By Signal");
else
LOG(ERROR, "Write Eventfd Failed");
}
}
// 判断当前是否处于对应线程
bool IsInLoop()
{
return _thread_id == std::this_thread::get_id();
}
public:
// 将任务放入任务队列中
void InsertTaskPool(const Function &cb)
{
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks_pool.push_back(cb);
}
WeakUpEventFd();
}
// 对判断对应线程操作进行断言
void AssertInLoop()
{
assert(IsInLoop());
}
// 开始事件轮询监控执行
void Start()
{
while (1)
{
std::vector<Channel *> actives;
_poller.Poll(actives);
for (auto &ch : actives)
ch->HandleEvent();
RunAllTask();
}
}
// 执行任务,如在对应线程直接执行,不在则放入任务队列中
void RunInLoop(const Function &cb)
{
if (IsInLoop())
cb();
else
InsertTaskPool(cb);
}
// 更新Poller中的事件监控
void UpdateChannel(ns_eventloop::Channel *ch)
{
_poller.Update(ch);
}
// 移除Poller中的事件监控
void RemoveChannel(ns_eventloop::Channel *ch)
{
_poller.Remove(ch);
}
//添加定时任务
void AddTimerTask(uint64_t id, uint64_t timeout, const Function &cb)
{
_timerwheel.AddTimerTask(id, timeout, cb);
}
// 刷新定时任务
void RefreshTimerTask(uint64_t id)
{
_timerwheel.RefreshTimerTask(id);
}
// 取消定时任务
void CancelTimerTask(uint64_t id)
{
_timerwheel.CancelTimerTask(id);
}
// 判断定时任务是否存在
bool ExistTimerTask(uint64_t id)
{
return _timerwheel.ExistTimerTask(id);
}
};
void Channel::Update()
{
_loop->UpdateChannel(this);
}
void Channel::Remove()
{
_loop->RemoveChannel(this);
}
// 添加定时任务
void TimerWheel::AddTimerTask(uint64_t id, uint64_t timeout, const Function &cb)
{
_loop->RunInLoop(std::bind(&TimerWheel::_AddTimerTask, this, id, timeout, cb));
}
// 刷新定时任务
void TimerWheel::RefreshTimerTask(uint64_t id)
{
_loop->RunInLoop(std::bind(&TimerWheel::_RefreshTimerTask, this, id));
}
// 取消定时任务
void TimerWheel::CancelTimerTask(uint64_t id)
{
_loop->RunInLoop(std::bind(&TimerWheel::_CancelTimerTask, this, id));
}
} // namespace ns_eventloop
下一篇博客是Connection模块的实现,这个模块实现过程比较复杂,这个模块也是项目的核心模块
专栏:Reactor模型高并发服务器项目 项目代码仓库:Reactor模型高并发服务器项目代码仓库(随博客更新) 都看到这里了,留下你们的珍贵的👍点赞+⭐收藏+📋评论吧
评论前必须登录!
注册