根据前文Epoll原理可知,利用Epoll机制实现IO多路复用的关键是三大组件:
-
- 利用epoll_create创建epoll实例,管理文件描述符以及事件;
- 利用epoll_ctl将文件描述符(监听描述符和数据传输描述符)以及感兴趣事件添加到epoll实例中
- 利用epoll_wait获取各文件描述符实际发生的事件并进行想用的读写操作
本系统服务端将以上面的逻辑作为工作主线,结合多线程编程思想实现监听(主循环)和IO操作解耦操作,以适应高并发连接下的数据传输操作。
开始之前,首先讲解多线程编程的思想,完成服务器的框架构建。
在多线程编程中,常常涉及到线程并发,常见的两种问题如下:
在本系统中,主循环负责监听连接以及IO操作请求,工作线程负责实现具体的IO操作,很明显,两者存在依赖关系,因此利用条件变量进行解决。
为什么要使用条件变量?
因为如果不使用条件变量,线程就需要 轮询+休眠 来查看是否满足条件,这样严重影响效率。
5.1 条件变量的原理
条件变量总是需要与互斥量结合使用,互斥量能限制一个线程能够访问共享资源,条件变量是在共享变量状态改变时发出通知。
典型的模型就是生产者-消费者模型:
生产者:生产数据的线程或者进程
消费者:消费数据的线程或者进程
生产者和消费者彼此不通信,生产者不需要等待消费者是否消费者处理,生产者直接往阻塞队列中生产数据,消费者不找生产者要数据,直接从阻塞队列中拿数据并处理数据,阻塞队列是一个数据缓冲区,这个阻塞队列将消费数据和生产数据进行了解耦。
条件变量主要包含以下两个函数:
该函数会自动释放与之关联的互斥锁,阻塞所在线程,并将该线程加入到等待当前条件变量的线程列表中。当有其他线程调用当前条件变量的 notify_one() 或 notify_all() 成员函数时,该线程才会继续执行。但由于唤醒也可能是虚假唤醒,因此每次结束阻塞后都应立即重新检查自己所等待的条件是否满足。
该函数有一个回调函数作为自己的参数——这个回调函数的作用,就是用来检查本次唤醒是否为虚假唤醒。
当线程解除阻塞后,wait() 函数会重新对互斥量加锁,调用回调函数检查条件是否满足。若条件不满足,则该函数会以一次原子操作释放锁,然后阻塞当前线程,并将当前线程加入到等待当前条件变量的线程列表中。
若有多个线程在等待同一个条件变量,则 notify_one() 会只唤醒这些阻塞线程中的一个线程。
若有多个线程在等待同一个条件变量,则 notify_all() 会唤醒所有的线程。
- 线程 1 调用条件变量的 wait() 函数,该函数内部会对一个互斥量加锁,检查条件是否满足;
- 若该条件没有满足,则 wait() 函数会释放锁,然后该函数将等待条件变量得到外部的信号(从外部看,此时的线程正在 wait() 函数这一句话中阻塞等待)。(条件变量的 wait() 函数能够保证相关操作的原子性);
- 另一个线程(比如线程 2 )在条件1所需的条件已经准备好了之后,对(线程1正在等待的那个)条件变量发出信号;
- 一旦条件变量得到信号,线程 1 就会被唤醒(但此时仍然在 wait() 函数中,只不过是继续执行 wait() 函数了),唤醒的第一件事是再次加锁,然后通过用户定义的检查函数来检查条件变量相关联的条件真的被满足了:如果真的满足了,那 wait() 函数就会返回,线程 1 真正开始继续运行。如果检查的结果是并没有真正满足,那就意味着这次运行只是一次虚假唤醒(spurious wakeup); 如果有多个线程正在等待信号,可以使用 notify_one 来只激活其中一个等待的线程。
- 若此次运行是一次虚假唤醒,该函数就会重新调用 wait() 函数,继续等待真正的信号来临。
5.2 本系统设计与实现
本系统服务端设计框架如上主要包含三大部件:
本系统的模式架构分为两大模块(Reactor模式):
代码的逻辑思路是:
客户端代码逻辑比较简单,就不再赘述,下附源码:
main.cpp
#include <iostream>
#include <thread>
#include "TcpserverEpoll.h"
#include "Tcpclient.h"
void run_server() {
// 创建一个监听IP地址为127.0.0.1,端口为8080,线程数为4的服务器
Tcpserver server("127.0.0.1", 8080, 4);
// 启动服务器
server.start();
}
void run_client() {
// 创建客户端,连接到127.0.0.1:8080
Tcpclient client("127.0.0.1", 8080);
if (client.connection()) {
std::cout << "Connected to server" << std::endl;
// 发送数据
std::string message = "Hello, Server!";
client.clisend(message);
// 接收数据
std::string response;
client.clirecv(response);
std::cout << "Received from server: " << response << std::endl;
} else {
std::cout << "Failed to connect to server" << std::endl;
}
}
int main() {
// 启动服务器线程
std::thread server_thread(run_server);
// 等待服务器启动
std::this_thread::sleep_for(std::chrono::seconds(1));
// 启动客户端线程
std::thread client_thread(run_client);
// 等待线程完成
server_thread.join();
client_thread.join();
return 0;
}
TcpserverEpoll.h
/*
利用epoll机制代替poll机制,无需poll机制下每次调用时复制整个文件描述符集合,只需要关注实际发生的事件
采用reactor模式:将主循环和事件处理逻辑分离开,主循环负责等待就绪事件分发给工作线程,工作线程利用线程池并行处理实际发生的事件
服务器类:
1、套接字初始化和连接
2、数据读写
定义以下结构:
1、Tcpserver类:套接字初始化、连接、事件分发
2、事件处理接口
2、连接事件处理器
*/
#pragma once
#include<iostream>
using namespace std;
#include <sys/epoll.h>
#include<vector>
#include<functional>
#include <condition_variable>
#include<queue>
//Reactor模式中的事件处理器接口
class EventHandler{
public:
virtual ~EventHandler() {}
virtual void handleInput(int sockfd) = 0;
virtual void handleOutput(int sockfd) = 0;
virtual int getsocket() = 0;
};
//Tcpserver主类
class Tcpserver{
public:
//构造函数,初始化线程数、服务器ip和端口号
Tcpserver(string ip,int port,int numThreads);
//初始化套接字并绑定地址,监听连接请求
void init_server_socket();
//接收连接
void accept_connection();
//具体实现初始化套接字和epoll实例,绑定地址,监听链接请求,接收链接,读写数据
void start();
//工作线程
void work_thread();
//分发事件给工作线程,负责把就绪事件的线程数组分发给工作线程,实现多线程处理
void distribute_events(vector<EventHandler*>&& handlers);
// //发送数据
// void sersend(int socket,string& data);
// //接收数据
// ssize_t serrecv(int socket,string& show_inbuff);
private:
string s_ip;
int s_port;
int listensock;
//线程数量
int m_numThreads;
//epoll实例
int epfd;
//结构体数组evlist,用于保存实际发生的事件信息
vector<struct epoll_event> events;
//多线程哈希表,用于保存多个客户端和服务器的连接与IO操作信息
//智能指针:unique_ptr来管理EventHandler实例,避免内存泄露。【智能指针的用法与原理还需要复习!!!】
//用于将每个客户端的套接字(datasock)映射到一个 std::unique_ptr,该指针指向一个 EventHandler 对象,
//用父类进行定义,可以使得在hash表中保存不同类型的子类对象,实现多态
unordered_map<int,unique_ptr<EventHandler>>m_handlers;
//多线程与互斥锁相关变量
//事件队列,用于后续分发给工作线程
queue<EventHandler*> m_eventQueue;
//定义事件互斥锁,用于保护共享资源,防止多个线程同时访问该资源导致的数据竞争和不一致性
//过互斥锁,确保在同一时刻只有一个线程可以访问共享资源
mutex m_eventQueueMutex;
//条件变量用于线程间的通信,允许一个线程等待另一个线程发出的信号。
condition_variable m_eventQueueCond;
};
//连接处理器:连接、数据读写
class ConnectionHandler:public EventHandler{
public:
//连接器初始化,用于客户端与服务器连接的套接字传递
ConnectionHandler(int socket):m_sockfd(socket){};
//服务器读取数据并处理
void handleInput(int sockfd);
//服务器输出数据并处理
void handleOutput(int sockfd);
//获取服务器套接字
int getsocket();
private:
int m_sockfd;
};
TcpserverEpoll.cpp
#include<iostream>
using namespace std;
#include "TcpserverEpoll.h"
#include<sys/socket.h>
#include <arpa/inet.h>
#include <cstring>
#include <unistd.h>
#include <sys/epoll.h>
#include <thread>
Tcpserver::Tcpserver(string ip,int port,int numThreads):s_ip(ip),s_port(port),m_numThreads(numThreads),events(1024){};
void Tcpserver::init_server_socket(){
//初始化服务器套接字:IPV4,TCP字节流,默认协议
listensock=socket(AF_INET,SOCK_STREAM,0);
if (listensock < 0) {
perror("socket error");
exit(EXIT_FAILURE);
}
//绑定地址
struct sockaddr_in servaddr;
memset(&servaddr,0,sizeof(servaddr));
servaddr.sin_family=AF_INET;
//此ip和port是用户指定的
servaddr.sin_addr.s_addr=inet_addr(s_ip.c_str());
servaddr.sin_port=htons(s_port);
if (bind(listensock, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0) {
perror("bind error");
close(listensock);
exit(EXIT_FAILURE);
}
//监听链接请求,后面的数字是监听的线程数
if (listen(listensock, m_numThreads) < 0) {
perror("listen error");
close(listensock);
exit(EXIT_FAILURE);
}
cout<<"server listensock= "<<listensock<<endl;
}
//利用epoll_ctl方法将数据传输套接字添加到epoll实例中
void Tcpserver::accept_connection(){
struct sockaddr_in cliaddr;
socklen_t cliaddelen=sizeof(cliaddr);\\
//创建客户端服务器数据文件描述符,为后续数据传输做准备
int datasock=accept(listensock,(struct sockaddr*)&cliaddr,&cliaddelen);
cout<<"client datasock= "<<datasock<<endl;
//利用多线程管理多个客户端与服务器的连接,采用hash表结构(key:datasock,value:ConnectionHandler)
//使用 std::make_unique 创建一个 ConnectionHandler 对象的 std::unique_ptr,并将其存储在哈希表中。
//这样,哈希表中的每个条目都管理一个客户端连接。
m_handlers[datasock]=make_unique<ConnectionHandler>(datasock);
//构建数据可读event,将数据连接套接字添加到epoll实例中。同时设置触发模式为边缘触发
struct epoll_event event;
event.data.fd=datasock;
event.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,datasock,&event);
}
//相当于生产者:锁定互斥锁,设置条件,通知等待的线程
void Tcpserver::distribute_events(vector<EventHandler*>&& handlers){
//定义一个作用域块,控制锁的生命周期
{
//把就绪事件线程放入事件队列,工作线程关注事件队列
unique_lock<mutex> lock(m_eventQueueMutex);//锁定互斥锁
for (auto handler : handlers) {
m_eventQueue.push(handler);//设置条件
}
}
//条件变量通知等待的线程
m_eventQueueCond.notify_all();
}
//相当于消费者:锁定互斥锁,检查条件是否满足,执行操作
void Tcpserver::work_thread(){
//工作线程循环等待事件队列的通知
while(true){
EventHandler* handler = nullptr;
//定义一个作用域块,控制锁的生命周期
{
/*
这是一个条件变量等待操作。线程会在 m_eventQueueCond 上等待,直到队列 m_eventQueue 非空。
Lambda 表达式 [this] { return !m_eventQueue.empty(); } 用于检查条件是否满足。[this] 表示捕获当前对象的 this 指针。
*/
unique_lock<mutex> lock(m_eventQueueMutex);//锁定互斥锁,互斥锁在离开作用域后自动释放
m_eventQueueCond.wait(lock, [this] { return !m_eventQueue.empty(); });//检查条件是否满足
//执行操作
handler = m_eventQueue.front();
m_eventQueue.pop();
}
//从就绪事件队列中提取子线程,执行相应的操作(读写数据)
if (handler) {
handler->handleInput(handler->getsocket());
handler->handleOutput(handler->getsocket());
}
}
}
void Tcpserver::start(){
//初始化服务器套接字
init_server_socket();
//创建epoll实例
epfd=epoll_create1(0);
if (epfd < 0) {
perror("epoll_create error");
close(listensock);
exit(EXIT_FAILURE);
}
//将监听套接字添加到epoll实例中
struct epoll_event event;
event.data.fd=listensock;
event.events=EPOLLIN;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, listensock, &event) < 0) {
perror("epoll_ctl error");
close(listensock);
close(epfd);
exit(EXIT_FAILURE);
}
//启动工作线程
vector<thread> threads;
for (int i = 0; i < m_numThreads; ++i) {
threads.emplace_back(&Tcpserver::work_thread, this);
}
//事件循环:建立连接,分发就绪事件给工作线程
while(true){
//利用epoll_wait()等待返回已发生的事件[从evlist里面获取已发生的事件]
int nfds = epoll_wait(epfd, events.data(), events.size(), -1);
if (nfds < 0) {
perror("epoll_wait error");
break;
}
//建立就绪事件线程数组
vector<EventHandler*> handlers;
//遍历就绪事件,把读写事件放入线程数组
//【注意:主循环负责监听事件类型和把事件放到线程数组,之后并把线程数组分发到线程队列;在工作线程关注的是线程队列执行子线程的任务】
for(int i=0;i<nfds;i++){
//监听已发生事件文件描述符
int sockfd=events[i].data.fd;
//如果是监听描述符,说明要建立连接
if(sockfd==listensock){
accept_connection();
}
//说明要进行数据传输,需要监听具体是读还是写
else{
//服务器接收,根据哈希映射得到线程
if(events[i].events&EPOLLIN){
cout<<"clitoserv "<<endl;
handlers.push_back(m_handlers[sockfd].get());
}
//服务器发送
if(events[i].events&EPOLLOUT){
handlers.push_back(m_handlers[sockfd].get());
}
}
}
// 分发事件给工作线程
distribute_events(std::move(handlers));
}
// 等待工作线程退出
for (auto& thread : threads) {
thread.join();
}
}
// ssize_t Tcpserver::serrecv(int socket,string& show_inbuff){
// //定义写入的缓冲区
// char buff[1024];
// ssize_t nbytes=recv(socket,buff,sizeof(buff),0);
// //说明没有数据写入
// if (nbytes <= 0) {
// if (nbytes == 0) {
// cout << "Client disconnected" << endl;
// } else {
// perror("recv error");
// }
// }
// else{
// show_inbuff.append(buff,nbytes);
// }
// return nbytes;
// }
// void Tcpserver::sersend(int socket,string& data){
// send(socket,data.data(),data.size(),0);
// }
/*————-连接器类实现———–*/
//服务器读取数据并处理
void ConnectionHandler::handleInput(int sockfd){
//服务器读取数据并显示
string inbuff;
char buff[1024];
ssize_t nbytes=recv(sockfd,buff,sizeof(buff),0);
if (nbytes <= 0) {
if (nbytes == 0) {
cout << "Client disconnected" << endl;
} else {
perror("recv error");
}
}
else{
inbuff.append(buff,nbytes);
cout << "Received data: " << inbuff << endl;
}
}
//服务器输出数据并处理
void ConnectionHandler::handleOutput(int sockfd){
string outbuff = "Hello, Client!";
send(sockfd,outbuff.data(),outbuff.size(),0);
}
//获取服务器套接字
int ConnectionHandler::getsocket() {
return m_sockfd;
}
Tcpclient.h
/*
客户端主要实现功能如下:
1、连接服务器
2、发送数据
3、接收响应
*/
#pragma once
#include<iostream>
using namespace std;
class Tcpclient{
public:
Tcpclient(string ip,int port);
~Tcpclient();
//连接服务器
bool connection();
ssize_t clisend(string& data);
ssize_t clirecv(string& buff);
private:
//客户端ip
string c_ip;
//客户端端口号
int c_port;
//客户端socket
int clisock;
};
Tcpclient.cpp
#include<iostream>
using namespace std;
#include"Tcpclient.h"
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <poll.h>
#include <cstring>
Tcpclient:: Tcpclient(string ip,int port):c_ip(ip),c_port(port),clisock(-1){};
Tcpclient::~Tcpclient(){
if(clisock!=-1){
close(clisock);
}
}
bool Tcpclient::connection(){
//初始化客户端套接字:IPV4,TCP字节流,默认协议
clisock=socket(AF_INET,SOCK_STREAM,0);
//绑定地址
struct sockaddr_in cliaddr;
memset(&cliaddr,0,sizeof(cliaddr));
cliaddr.sin_family=AF_INET;
//此ip和port是用户指定的
cliaddr.sin_addr.s_addr=inet_addr(c_ip.c_str());;
cliaddr.sin_port=htons(c_port);
//连接服务器
if(connect(clisock,(struct sockaddr*)&cliaddr,sizeof(cliaddr))==-1){
return false;
}
else{
return true;
}
}
ssize_t Tcpclient::clisend(string& data){
return send(clisock,data.data(),data.size(),0);
}
ssize_t Tcpclient::clirecv(string& buff){
char temp[1024];
ssize_t nbytes=recv(clisock,temp,sizeof(temp),0);
//说明没有数据接收到
if (nbytes <= 0) {
if (nbytes == 0) {
cout << "服务器关闭连接" << endl;
} else {
perror("recv error");
}
}
else {
buff.append(temp, nbytes);
}
return nbytes;
}
评论前必须登录!
注册