C++项目 | 集群聊天服务器 | 服务器集群
文章目录
- C++项目 | 集群聊天服务器 | 服务器集群
-
- 1.负载均衡存在的意义
- 2.服务器集群
-
- 负载均衡器 – 一致性哈希算法
- nginx配置tcp负载均衡
- 测试
- 3.服务器中间件-基于发布-订阅的Redis
-
- 集群服务器之间的通信设计
- redis发布-订阅的客户端编程
- 代码方面
- 4.集群后的测试
-
- 1.两台服务器进行集群
- 2.登录两个客户端
- 3.分别进行一对一聊天和群聊
1.负载均衡存在的意义
为什么要有负载均衡器?
一台机器的文件描述符是有限的,之前的linux系统最大也就1024,那说明也就同时最多可以提供1024个人的服务聊天服务还是长连接,需要一直保持和服务器的连接才行,那么随着用户规模的扩大,就需要更多的服务器,而负载均衡器就是负责把这些请求分发到不同的服务器上面去。不可能说是给用户三台服务器,让用户自己选要用哪一台提供服务
第一点是基本功能
第二点,保持心跳是以防有的主机发生了故障,不能够继续工作,那么负载均衡器不可以往这个服务器发送请求服务的请求
第三点,一般服务器都是24小时不关的,因为可能24小时都有人请求服务,不可能说是为了添加新的机器就把服务给停掉,这样的话用户体验会很差,所以要有第三点
2.服务器集群
负载均衡器 – 一致性哈希算法
单台服务器受限于硬件资源,其性能是有上限的,当单台服务器不能满足应用场景的并发需求量时,就需要考虑部署多个服务器共同处理客户端的并发请求,但是客户端怎么知道去连接具体哪台服务器呢?此时就需要一台负载均衡器,通过预设的负载算法,指导客户端连接服务器。
负载均衡器有基于客户端的负载均衡和服务器的负载均衡。
普通的基于哈希的负载算法,并不能满足负载均衡所要求的单调性和平衡性,但一致性哈希算法非常好的保持了这两种特性,所以经常用在需要设计负载算法的应用场景当中。
nginx配置tcp负载均衡
在服务器快速集群环境搭建中,都迫切需要一个能拿来即用的负载均衡器,nginx在1.9版本之前,只支持http协议web服务器的负载均衡,从1.9版本开始以后,nginx开始支持tcp的长连接负载均衡,但是nginx默认并没有编译tcp负载均衡模块,编写它时,需要加入–with-stream参数来激活这个模块。
nginx编译加入–with-stream参数激活tcp负载均衡模块
nginx编译安装需要先安装pcre、openssl、zlib等库,也可以直接编译执行下面的configure命令,根
据错误提示信息,安装相应缺少的库。
下面的make命令会向系统路径拷贝文件,需要在root用户下执行
./configure –with-stream
make && make install
编译完成后,默认安装在了/usr/local/nginx目录。
cd /usr/local/nginx/
ls
可执行文件在sbin目录里面,配置文件在conf目录里面。
nginx -s reload 重新加载配置文件启动
nginx -s stop 停止nginx服务
主要在conf目录里面配置nginx.conf文件,配置如下:
很多东西之前都有,把下面的部分加上就是了
stream {
upstream MyServer {
#weight是权重 max_fails表示心跳,只要相应超过30秒的次数超过3次,那这个服务器就算是挂掉了
server 127.0.0.1:6000 weight=1 max_fails=3 fail_timeout=30s;
server 127.0.0.1:6002 weight=1 max_fails=3 fail_timeout=30s;
}
server {
proxy_connect_timeout 1s;
#proxy_timeout 3s;
#ngxin监听的端口号,客户端都连接这个端口号,然后ngxin分发到其他服务器
listen 8000;
proxy_pass MyServer;
tcp nodelay on;
}
}
配置完成后,./nginx -s reload平滑重启。
测试
// src/server/main.cpp
#include"chatserver.hpp"
#include"chatservice.hpp"
#include<iostream>
#include<signal.h>
using namespace std;
//处理器ctrl + c结束后,重置user 的状态信息,把登录变成未登录
void resetHandler(int)
{
ChatService::instance()->reset();
exit(0);
}
int main(int argc,char **argv)
{
if(argc<3)
{
cerr<<"参数过少!example:./ChatServer 127.0.0.1 6000"<<endl;
}
//解析通过命令行参数传递的ip和port
char *ip=argv[1];
uint16_t port=atoi(argv[2]);
signal(SIGINT,resetHandler);
EventLoop loop;
InetAddress addr(ip,port);
ChatServer server(&loop,addr,"ChatServer");
server.start();
loop.loop();
return 0;
}
1.启动两台服务器
2.启动两个客户端
客户端都是8000端口,就是nginx监听的端口
两个客户端都可以登录,注册之类的操作
但聊天现在还不行,因为_userConnMap并不相通
3.服务器中间件-基于发布-订阅的Redis
集群服务器之间的通信设计
当ChatServer集群部署多台服务器以后,当登录在不同服务器上的用户进行通信时,该怎么设计!如下设计好吗?
上面的设计,让各个ChatServer服务器互相之间直接建立TCP连接进行通信,相当于在服务器网络之间进行广播。这样的设计使得各个服务器之间耦合度太高,不利于系统扩展,并且会占用系统大量的socket资源,各服务器之间的带宽压力很大,不能够节省资源给更多的客户端提供服务,因此绝对不是一个好的设计。
集群部署的服务器之间进行通信,最好的方式就是引入中间件消息队列,解耦各个服务器,使整个系统松耦合,提高服务器的响应能力,节省服务器的带宽资源,如下图所示:
在集群分布式环境中,经常使用的中间件消息队列有ActiveMQ、RabbitMQ、Kafka等,都是应用场景广泛并且性能很好的消息队列,供集群服务器之间,分布式服务之间进行消息通信。限于我们的项目业务类型并不是非常复杂,对并发请求量也没有太高的要求,因此我们的中间件消息队列选型的是-基于发布-订阅模式的redis。
redis发布-订阅的客户端编程
redis支持多种不同的客户端编程语言,例如Java对应jedis、php对应phpredis、C++对应的则是
hiredis。下面是安装hiredis的步骤:
1.git clone https://github.com/redis/hiredis 从github上下载hiredis客户端,进行源码
编译安装
tony@tony-virtual-machine:~/github$ git clone
https://github.com/redis/hiredis
正克隆到 'hiredis'…
remote: Enumerating objects: 3261 , done.
^C收对象中: 83 % (2707/3261), 876 .01 KiB | 59 .00 KiB/s
2.cd hiredis
3.make
tony@tony-virtual-machine:~/github/hiredis$ make
cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite-
strings -Wno-missing-field-initializers -g -ggdb net.c
cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite-
strings -Wno-missing-field-initializers -g -ggdb hiredis.c
cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite-
strings -Wno-missing-field-initializers -g -ggdb sds.c
cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite-
strings -Wno-missing-field-initializers -g -ggdb async.c
cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite-
strings -Wno-missing-field-initializers -g -ggdb read.c
cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite-
strings -Wno-missing-field-initializers -g -ggdb sockcompat.c
cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite-
strings -Wno-missing-field-initializers -g -ggdb sslio.c
cc -shared -Wl,-soname,libhiredis.so.0.14 -o libhiredis.so net.o hiredis.o
sds.o async.o read.o sockcompat.o sslio.o
ar rcs libhiredis.a net.o hiredis.o sds.o async.o read.o sockcompat.o
sslio.o
cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite-
strings -Wno-missing-field-initializers -g -ggdb test.c
cc -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite-strings -Wno-missing-
field-initializers -g -ggdb -o hiredis-test test.o libhiredis.a
Generating hiredis.pc for pkgconfig…
tony@tony-virtual-machine:~/github/hiredis$
编译成功!
4.sudo make install
tony@tony-virtual-machine:~/github/hiredis$ sudo make install
[sudo] tony 的密码:
mkdir -p /usr/local/include/hiredis /usr/local/include/hiredis/adapters
/usr/local/lib
cp -pPR hiredis.h async.h read.h sds.h sslio.h /usr/local/include/hiredis
cp -pPR adapters/*.h /usr/local/include/hiredis/adapters
cp -pPR libhiredis.so /usr/local/lib/libhiredis.so.0.
cd /usr/local/lib && ln -sf libhiredis.so.0.14 libhiredis.so
cp -pPR libhiredis.a /usr/local/lib
mkdir -p /usr/local/lib/pkgconfig
cp -pPR hiredis.pc /usr/local/lib/pkgconfig
拷贝生成的动态库到/usr/local/lib目录下!
5.sudo ldconfig /usr/local/lib
代码方面
//redis.hpp
#ifndef REDIS_H
#define REDIS_H
#include <hiredis/hiredis.h>
#include <thread>
#include <functional>
using namespace std;
using redis_handler = function<void(int,string)>;
class Redis
{
public:
Redis();
~Redis();
//连接Redis服务器
bool connect();
//向Redis指定的通道channel发布消息
bool publish(int channel, string message);
//向Redis指定的通道subscribe订阅消息
bool subscribe(int channel);
//取消订阅
bool unsubscribe(int channel);
//独立线程中接收订阅通道的消息
void observer_channel_message();
//初始化业务层上报通道消息的回调对象
void init_notify_handler(redis_handler handler);
private:
//hiredis同步上下文对象,负责publish消息
redisContext *publish_context_;
//负责subscribe消息
redisContext *subcribe_context_;
//回调操作,收到消息给service上报
redis_handler notify_message_handler_;
};
#endif
//redis.cpp
#include "redis.hpp"
#include <iostream>
Redis::Redis() : publish_context_(nullptr), subcribe_context_(nullptr)
{
}
Redis::~Redis()
{
if (publish_context_ != nullptr)
{
redisFree(publish_context_);
}
if (subcribe_context_ != nullptr)
{
redisFree(subcribe_context_);
}
}
//连接Redis服务器
bool Redis::connect()
{
publish_context_ = redisConnect("127.0.0.1", 6379);
if (publish_context_ == nullptr)
{
cerr << "connect redis failed!" << endl;
return false;
}
subcribe_context_ = redisConnect("127.0.0.1", 6379);
if (subcribe_context_ == nullptr)
{
cerr << "connect redis failed!" << endl;
return false;
}
// 独立线程中接收订阅通道的消息
thread t([&]() {
observer_channel_message();
});
t.detach();
cout << "connect redis-server success!" << endl;
return true;
}
//向Redis指定的通道channel发布消息
bool Redis::publish(int channel, string message)
{
// 相当于publish 键 值
// redis 127.0.0.1:6379> PUBLISH runoobChat "Redis PUBLISH test"
redisReply *reply = (redisReply *)redisCommand(publish_context_, "PUBLISH %d %s", channel, message.c_str());
if (reply == nullptr)
{
cerr << "publish command failed!" << endl;
return false;
}
// 释放资源
freeReplyObject(reply);
return true;
}
// 向Redis指定的通道subscribe订阅消息
bool Redis::subscribe(int channel)
{
// redisCommand 会先把命令缓存到context中,然后调用RedisAppendCommand发送给redis
// redis执行subscribe是阻塞,不会响应,不会给我们一个reply
// redis 127.0.0.1:6379> SUBSCRIBE runoobChat
if (REDIS_ERR == redisAppendCommand(subcribe_context_, "SUBSCRIBE %d", channel))
{
cerr << "subscibe command failed" << endl;
return false;
}
int done = 0;
while (!done)
{
if (REDIS_ERR == redisBufferWrite(subcribe_context_, &done))
{
cerr << "subscribe command failed" << endl;
return false;
}
}
return true;
}
//取消订阅
bool Redis::unsubscribe(int channel)
{
//redisCommand 会先把命令缓存到context中,然后调用RedisAppendCommand发送给redis
//redis执行subscribe是阻塞,不会响应,不会给我们一个reply
if (REDIS_ERR == redisAppendCommand(subcribe_context_, "UNSUBSCRIBE %d", channel))
{
cerr << "subscibe command failed" << endl;
return false;
}
int done = 0;
while (!done)
{
if (REDIS_ERR == redisBufferWrite(subcribe_context_, &done))
{
cerr << "subscribe command failed" << endl;
return false;
}
}
return true;
}
//独立线程中接收订阅通道的消息
void Redis::observer_channel_message()
{
redisReply *reply = nullptr;
while (REDIS_OK == redisGetReply(subcribe_context_, (void **)&reply))
{
//reply里面是返回的数据有三个,0. message , 1.通道号,2.消息
if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr)
{
//给业务层上报消息
notify_message_handler_(atoi(reply->element[1]->str), reply->element[2]->str);
}
freeReplyObject(reply);
}
cerr << "———————– oberver_channel_message quit————————–" << endl;
}
//初始化业务层上报通道消息的回调对象
void Redis::init_notify_handler(redis_handler handler)
{
notify_message_handler_ = handler;
}
cahtService.hpp和cpp的修改见github地址
4.集群后的测试
1.两台服务器进行集群
2.登录两个客户端
3.分别进行一对一聊天和群聊
测试成功!
评论前必须登录!
注册