一、EMQX 认证方式
EMQ X 是一个高度可扩展的 MQTT 消息服务器,它支持多种认证和授权机制来确保安全性。以下是 EMQ X 支持的一些常见认证方式:
内置用户认证:
- EMQ X 拥有一个内置的用户数据库,可以存储用户名和密码,进行基本的认证。
HTTP 认证:
- 通过 HTTP 接口进行认证,可以集成外部的用户系统。
数据库认证:
- 支持 MySQL、PostgreSQL、SQLite 等多种数据库,通过数据库查询进行用户认证。
LDAP 认证:
- 支持与 LDAP 服务器集成,进行目录服务认证。
JWT(JSON Web Tokens):
- 支持 JWT 作为认证机制,适用于分布式系统和微服务架构。
OAuth 2.0:
- 支持 OAuth 2.0 授权框架,可以与各种 OAuth 服务提供商集成。
API 密钥:
- 通过 API 密钥进行认证,适用于不需要用户身份验证的简单场景。
MQTT 证书认证:
- 支持 SSL/TLS 证书认证,适用于需要加密传输的场合。
访问控制列表(ACL):
- 使用 ACL 管理用户或客户端的权限,控制对特定主题的访问。
外部脚本认证:
- 允许使用外部脚本进行复杂的认证逻辑处理。
Kerberos 认证:
- 对于需要 Kerberos 认证的企业环境,EMQ X 也提供了相应的支持。
Nest 认证:
- 支持 Nest 协议进行设备认证,适用于物联网设备。
每种认证方式都有其适用场景和配置方法。例如,数据库认证适用于需要集中管理用户信息的大型系统,而 MQTT 证书认证适用于需要加密通信的金融或医疗行业。
要配置认证方式,通常需要在 EMQ X 的配置文件 emqx.conf 中进行相应的设置,或者通过 EMQ X Dashboard 进行配置
二、EMQX常用认证方式(EMQX 界面配置)
1.内置用户认证
1.1 创建内置用户认证客户端认证
1.2 选择数据库类型
1.3 选择账号类型 这里选择username
1.4 创建用户
1.5 java实现emqt集成
1.5.1 pom.xml 中引入 mqtt相关jar
<!– mqtt –>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
1.5.2 application.yml 配置mq连接信息
1.5.3 编写实现代码
MqttConfiguration.java
package com.chopin.mqtt.mqtt.config;
import com.chopin.mqtt.mqtt.client.MyMQTTClient;
import com.chopin.mqtt.mqtt.utils.JwtUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @class MqttConfiguration
* @description
* @copyright Copyright: 2024-2030
* @creator chopin
* @create-time 2024/8/08 14:38
**/
@Configuration
@Slf4j
@Data
public class MqttConfiguration {
@Value("${mqtt.host}")
String host;
@Value("${mqtt.username}")
String username;
@Value("${mqtt.password}")
String password;
@Value("${mqtt.clientId}")
String clientId;
@Value("${mqtt.timeout}")
int timeOut;
@Value("${mqtt.keepalive}")
int keepAlive;
@Value("${mqtt.topic1}")
public String topic1;
@Value("${mqtt.topic2}")
public String topic2;
@Value("${mqtt.topic3}")
public String topic3;
@Value("${mqtt.topic4}")
public String topic4;
@Bean//注入spring
public MyMQTTClient myMQTTClient() {
MyMQTTClient myMQTTClient = new MyMQTTClient(host, username, password, clientId, timeOut, keepAlive);
try {
myMQTTClient.connect();
//不同的主题
// myMQTTClient.subscribe(topic1, 1);
// myMQTTClient.subscribe(topic2, 1);
// myMQTTClient.subscribe(topic3, 1);
// myMQTTClient.subscribe(topic4, 1);
} catch (MqttException e) {
log.error("MQTT connect exception:{} ", e.getMessage());
}
return myMQTTClient;
}
}
MyMQTTClient.java
package com.chopin.mqtt.mqtt.client;
import com.chopin.mqtt.mqtt.callback.MyMQTTCallback;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* @class MyMQTTClient
* @description
* @copyright Copyright: 2024-2030
* @creator chopin
* @create-time 2024/8/08 14:41
**/
@Slf4j
public class MyMQTTClient {
private static MqttClient client;
private String host;
private String username;
private String password;
private String clientId;
private int timeout;
private int keepalive;
public MyMQTTClient(String host, String username, String password, String clientId, int timeOut, int keepAlive) {
this.host = host;
this.username = username;
this.password = password;
this.clientId = clientId;
this.timeout = timeOut;
this.keepalive = keepAlive;
}
public static MqttClient getClient() {
return client;
}
public static void setClient(MqttClient client) {
MyMQTTClient.client = client;
}
/**
* 设置mqtt连接参数
*
* @param username
* @param password
* @param timeout
* @param keepalive
* @return
*/
public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
options.setCleanSession(true);
options.setAutomaticReconnect(true);
return options;
}
/**
* 连接mqtt服务端,得到MqttClient连接对象
*/
public void connect() throws MqttException {
if (client == null) {
client = new MqttClient(host, clientId, new MemoryPersistence());
client.setCallback(new MyMQTTCallback(MyMQTTClient.this));
}
MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive);
if (!client.isConnected()) {
client.connect(mqttConnectOptions);
} else {
client.disconnect();
client.connect(mqttConnectOptions);
}
log.info("MQTT connect success");//未发生异常,则连接成功
}
/**
* 发布,默认qos为0,非持久化
*
* @param pushMessage
* @param topic
*/
public void publish(String pushMessage, String topic) {
publish(pushMessage, topic, 0, false);
}
/**
* 发布消息
*
* @param pushMessage
* @param topic
* @param qos
* @param retained:留存
*/
public void publish(String pushMessage, String topic, int qos, boolean retained) {
MqttMessage message = new MqttMessage();
message.setPayload(pushMessage.getBytes());
message.setQos(qos);
message.setRetained(retained);
MqttTopic mqttTopic = MyMQTTClient.getClient().getTopic(topic);
if (null == mqttTopic) {
log.error("topic is not exist");
}
MqttDeliveryToken token;//Delivery:配送
synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充
try {
token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件
token.waitForCompletion(1000L);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 订阅某个主题
*
* @param topic
* @param qos
*/
public void subscribe(String topic, int qos) {
try {
MyMQTTClient.getClient().subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 取消订阅主题
*
* @param topic 主题名称
*/
public void cleanTopic(String topic) {
if (client != null && client.isConnected()) {
try {
client.unsubscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
} else {
System.out.println("取消订阅失败!");
}
}
}
MyMQTTCallback.java
package com.chopin.mqtt.mqtt.callback;
import com.chopin.mqtt.mqtt.client.MyMQTTClient;
import com.chopin.mqtt.mqtt.config.MqttConfiguration;
import com.chopin.mqtt.mqtt.dto.MqttMsg;
import com.chopin.mqtt.mqtt.utils.SpringUtils;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import com.alibaba.fastjson.JSON;
import java.util.Map;
/**
* @class MyMQTTCallback
* @description
* @copyright Copyright: 2024-2030
* @creator chopin
* @create-time 2024/8/08 14:43
**/
@Slf4j
public class MyMQTTCallback implements MqttCallbackExtended {
//手动注入
private MqttConfiguration mqttConfiguration = SpringUtils.getBean(MqttConfiguration.class);
private MyMQTTClient myMQTTClient;
public MyMQTTCallback(MyMQTTClient myMQTTClient) {
this.myMQTTClient = myMQTTClient;
}
/**
* 丢失连接,可在这里做重连
* 只会调用一次
*
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
log.error("mqtt connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage());
long reconnectTimes = 1;
while (true) {
try {
if (MyMQTTClient.getClient().isConnected()) {
//判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete(方法里面) 看你们自己选择
log.warn("mqtt reconnect success end 重新连接 重新订阅成功");
return;
}
reconnectTimes+=1;
log.warn("mqtt reconnect times = {} try again… mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);
MyMQTTClient.getClient().reconnect();
} catch (MqttException e) {
log.error("mqtt断连异常", e);
}
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
/**
* @param topic
* @param mqttMessage
* @throws Exception
* subscribe后得到的消息会执行到这里面
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
log.info("接收消息主题 : {},接收消息内容 : {}", topic, new String(mqttMessage.getPayload()));
//发布消息主题
if (topic.contains("B/pick/warn/")){
MqttMsg mqttMsg = JSON.parseObject(new String(mqttMessage.getPayload(), CharsetUtil.UTF_8), MqttMsg.class);
//你自己的业务接口
log.info("接收消息主题 : {},接收消息内容 : {}", topic, JSON.toJSONString(mqttMsg));
}
if (topic.equals("embed/resp")){
Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), CharsetUtil.UTF_8));
//你自己的业务接口
}
//接收报警主题
if (topic.equals("embed/warn")){
Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), CharsetUtil.UTF_8));
//你自己的业务接口
}
}
/**
*连接成功后的回调 可以在这个方法执行 订阅主题 生成Bean的 MqttConfiguration方法中订阅主题 出现bug
*重新连接后 主题也需要再次订阅 将重新订阅主题放在连接成功后的回调 比较合理
* @param reconnect
* @param serverURI
*/
@Override
public void connectComplete(boolean reconnect,String serverURI){
log.info("MQTT 连接成功,连接方式:{}",reconnect?"重连":"直连");
//订阅主题
myMQTTClient.subscribe(mqttConfiguration.topic1, 1);
myMQTTClient.subscribe("B/pick/warn/#", 1);
myMQTTClient.subscribe(mqttConfiguration.topic3, 1);
myMQTTClient.subscribe(mqttConfiguration.topic4, 1);
}
/**
* 消息到达后
* subscribe后,执行的回调函数
*
* @param s
* @param mqttMessage
* @throws Exception
*/
/**
* publish后,配送完成后回调的方法
*
* @param iMqttDeliveryToken
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
}
}
运行测试:
emqx 面板
ws 数据订阅
评论前必须登录!
注册