云计算百科
云计算领域专业知识百科平台

Emqx入门系列【3】客户端认证方式之-内置用户认证

一、EMQX 认证方式

EMQ X 是一个高度可扩展的 MQTT 消息服务器,它支持多种认证和授权机制来确保安全性。以下是 EMQ X 支持的一些常见认证方式:

  • 内置用户认证:

    • EMQ X 拥有一个内置的用户数据库,可以存储用户名和密码,进行基本的认证。
  • HTTP 认证:

    • 通过 HTTP 接口进行认证,可以集成外部的用户系统。
  • 数据库认证:

    • 支持 MySQL、PostgreSQL、SQLite 等多种数据库,通过数据库查询进行用户认证。
  • LDAP 认证:

    • 支持与 LDAP 服务器集成,进行目录服务认证。
  • JWT(JSON Web Tokens):

    • 支持 JWT 作为认证机制,适用于分布式系统和微服务架构。
  • OAuth 2.0:

    • 支持 OAuth 2.0 授权框架,可以与各种 OAuth 服务提供商集成。
  • API 密钥:

    • 通过 API 密钥进行认证,适用于不需要用户身份验证的简单场景。
  • MQTT 证书认证:

    • 支持 SSL/TLS 证书认证,适用于需要加密传输的场合。
  • 访问控制列表(ACL):

    • 使用 ACL 管理用户或客户端的权限,控制对特定主题的访问。
  • 外部脚本认证:

    • 允许使用外部脚本进行复杂的认证逻辑处理。
  • Kerberos 认证:

    • 对于需要 Kerberos 认证的企业环境,EMQ X 也提供了相应的支持。
  • Nest 认证:

    • 支持 Nest 协议进行设备认证,适用于物联网设备。
  • 每种认证方式都有其适用场景和配置方法。例如,数据库认证适用于需要集中管理用户信息的大型系统,而 MQTT 证书认证适用于需要加密通信的金融或医疗行业。

    要配置认证方式,通常需要在 EMQ X 的配置文件 emqx.conf 中进行相应的设置,或者通过 EMQ X Dashboard 进行配置

     

    二、EMQX常用认证方式(EMQX 界面配置)

    1.内置用户认证

    1.1 创建内置用户认证客户端认证

    af9349c3bd384fd0ac29c1a98b0e16d8.png

    1.2 选择数据库类型

    ad8d2c68f8ba4cc082f8ee31fe841f8e.png

    1.3 选择账号类型 这里选择username 

    ff41cd13c291427aaf8bca2d46949bd8.png

    1.4 创建用户

    fdf6969d69f741f8a1564890045ac5a9.png

    d94187024dc24e7bb19d97671b441bf5.png

    1.5 java实现emqt集成

    1.5.1 pom.xml 中引入 mqtt相关jar

    <!– mqtt –>
    <dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    </dependency>

    1.5.2  application.yml 配置mq连接信息

    369f0f819076413dbbc0fb88b382d7ba.png

    1.5.3  编写实现代码

    MqttConfiguration.java
    package com.chopin.mqtt.mqtt.config;

    import com.chopin.mqtt.mqtt.client.MyMQTTClient;
    import com.chopin.mqtt.mqtt.utils.JwtUtil;
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;

    /**
    * @class MqttConfiguration
    * @description
    * @copyright Copyright: 2024-2030
    * @creator chopin
    * @create-time 2024/8/08 14:38
    **/
    @Configuration
    @Slf4j
    @Data
    public class MqttConfiguration {

    @Value("${mqtt.host}")
    String host;
    @Value("${mqtt.username}")
    String username;
    @Value("${mqtt.password}")
    String password;
    @Value("${mqtt.clientId}")
    String clientId;
    @Value("${mqtt.timeout}")
    int timeOut;
    @Value("${mqtt.keepalive}")
    int keepAlive;
    @Value("${mqtt.topic1}")
    public String topic1;
    @Value("${mqtt.topic2}")
    public String topic2;
    @Value("${mqtt.topic3}")
    public String topic3;
    @Value("${mqtt.topic4}")
    public String topic4;

    @Bean//注入spring
    public MyMQTTClient myMQTTClient() {
    MyMQTTClient myMQTTClient = new MyMQTTClient(host, username, password, clientId, timeOut, keepAlive);

    try {
    myMQTTClient.connect();
    //不同的主题
    // myMQTTClient.subscribe(topic1, 1);
    // myMQTTClient.subscribe(topic2, 1);
    // myMQTTClient.subscribe(topic3, 1);
    // myMQTTClient.subscribe(topic4, 1);

    } catch (MqttException e) {
    log.error("MQTT connect exception:{} ", e.getMessage());
    }

    return myMQTTClient;
    }

    }

    MyMQTTClient.java
    package com.chopin.mqtt.mqtt.client;

    import com.chopin.mqtt.mqtt.callback.MyMQTTCallback;
    import lombok.extern.slf4j.Slf4j;
    import org.eclipse.paho.client.mqttv3.*;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

    /**
    * @class MyMQTTClient
    * @description
    * @copyright Copyright: 2024-2030
    * @creator chopin
    * @create-time 2024/8/08 14:41
    **/
    @Slf4j
    public class MyMQTTClient {
    private static MqttClient client;
    private String host;
    private String username;
    private String password;
    private String clientId;
    private int timeout;
    private int keepalive;

    public MyMQTTClient(String host, String username, String password, String clientId, int timeOut, int keepAlive) {
    this.host = host;
    this.username = username;
    this.password = password;
    this.clientId = clientId;
    this.timeout = timeOut;
    this.keepalive = keepAlive;
    }

    public static MqttClient getClient() {
    return client;
    }

    public static void setClient(MqttClient client) {
    MyMQTTClient.client = client;
    }

    /**
    * 设置mqtt连接参数
    *
    * @param username
    * @param password
    * @param timeout
    * @param keepalive
    * @return
    */
    public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) {
    MqttConnectOptions options = new MqttConnectOptions();
    options.setUserName(username);
    options.setPassword(password.toCharArray());
    options.setConnectionTimeout(timeout);
    options.setKeepAliveInterval(keepalive);
    options.setCleanSession(true);
    options.setAutomaticReconnect(true);
    return options;
    }

    /**
    * 连接mqtt服务端,得到MqttClient连接对象
    */
    public void connect() throws MqttException {
    if (client == null) {
    client = new MqttClient(host, clientId, new MemoryPersistence());
    client.setCallback(new MyMQTTCallback(MyMQTTClient.this));
    }
    MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive);
    if (!client.isConnected()) {
    client.connect(mqttConnectOptions);
    } else {
    client.disconnect();
    client.connect(mqttConnectOptions);
    }
    log.info("MQTT connect success");//未发生异常,则连接成功
    }

    /**
    * 发布,默认qos为0,非持久化
    *
    * @param pushMessage
    * @param topic
    */
    public void publish(String pushMessage, String topic) {
    publish(pushMessage, topic, 0, false);
    }

    /**
    * 发布消息
    *
    * @param pushMessage
    * @param topic
    * @param qos
    * @param retained:留存
    */
    public void publish(String pushMessage, String topic, int qos, boolean retained) {
    MqttMessage message = new MqttMessage();
    message.setPayload(pushMessage.getBytes());
    message.setQos(qos);
    message.setRetained(retained);
    MqttTopic mqttTopic = MyMQTTClient.getClient().getTopic(topic);
    if (null == mqttTopic) {
    log.error("topic is not exist");
    }
    MqttDeliveryToken token;//Delivery:配送
    synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充
    try {
    token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件
    token.waitForCompletion(1000L);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

    /**
    * 订阅某个主题
    *
    * @param topic
    * @param qos
    */
    public void subscribe(String topic, int qos) {
    try {
    MyMQTTClient.getClient().subscribe(topic, qos);
    } catch (MqttException e) {
    e.printStackTrace();
    }
    }

    /**
    * 取消订阅主题
    *
    * @param topic 主题名称
    */
    public void cleanTopic(String topic) {
    if (client != null && client.isConnected()) {
    try {
    client.unsubscribe(topic);
    } catch (MqttException e) {
    e.printStackTrace();
    }
    } else {
    System.out.println("取消订阅失败!");
    }
    }
    }

    MyMQTTCallback.java

    package com.chopin.mqtt.mqtt.callback;

    import com.chopin.mqtt.mqtt.client.MyMQTTClient;
    import com.chopin.mqtt.mqtt.config.MqttConfiguration;
    import com.chopin.mqtt.mqtt.dto.MqttMsg;
    import com.chopin.mqtt.mqtt.utils.SpringUtils;
    import io.netty.util.CharsetUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import com.alibaba.fastjson.JSON;
    import java.util.Map;

    /**
    * @class MyMQTTCallback
    * @description
    * @copyright Copyright: 2024-2030
    * @creator chopin
    * @create-time 2024/8/08 14:43
    **/
    @Slf4j
    public class MyMQTTCallback implements MqttCallbackExtended {
    //手动注入
    private MqttConfiguration mqttConfiguration = SpringUtils.getBean(MqttConfiguration.class);
    private MyMQTTClient myMQTTClient;
    public MyMQTTCallback(MyMQTTClient myMQTTClient) {
    this.myMQTTClient = myMQTTClient;
    }
    /**
    * 丢失连接,可在这里做重连
    * 只会调用一次
    *
    * @param throwable
    */
    @Override
    public void connectionLost(Throwable throwable) {
    log.error("mqtt connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage());
    long reconnectTimes = 1;
    while (true) {
    try {
    if (MyMQTTClient.getClient().isConnected()) {
    //判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete(方法里面) 看你们自己选择
    log.warn("mqtt reconnect success end 重新连接 重新订阅成功");
    return;
    }
    reconnectTimes+=1;
    log.warn("mqtt reconnect times = {} try again… mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);
    MyMQTTClient.getClient().reconnect();
    } catch (MqttException e) {
    log.error("mqtt断连异常", e);
    }
    try {
    Thread.sleep(5000);
    } catch (InterruptedException e1) {
    e1.printStackTrace();
    }
    }
    }
    /**
    * @param topic
    * @param mqttMessage
    * @throws Exception
    * subscribe后得到的消息会执行到这里面
    */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
    log.info("接收消息主题 : {},接收消息内容 : {}", topic, new String(mqttMessage.getPayload()));
    //发布消息主题
    if (topic.contains("B/pick/warn/")){
    MqttMsg mqttMsg = JSON.parseObject(new String(mqttMessage.getPayload(), CharsetUtil.UTF_8), MqttMsg.class);
    //你自己的业务接口
    log.info("接收消息主题 : {},接收消息内容 : {}", topic, JSON.toJSONString(mqttMsg));

    }
    if (topic.equals("embed/resp")){
    Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), CharsetUtil.UTF_8));
    //你自己的业务接口

    }
    //接收报警主题
    if (topic.equals("embed/warn")){
    Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), CharsetUtil.UTF_8));
    //你自己的业务接口
    }
    }

    /**
    *连接成功后的回调 可以在这个方法执行 订阅主题 生成Bean的 MqttConfiguration方法中订阅主题 出现bug
    *重新连接后 主题也需要再次订阅 将重新订阅主题放在连接成功后的回调 比较合理
    * @param reconnect
    * @param serverURI
    */
    @Override
    public void connectComplete(boolean reconnect,String serverURI){
    log.info("MQTT 连接成功,连接方式:{}",reconnect?"重连":"直连");
    //订阅主题
    myMQTTClient.subscribe(mqttConfiguration.topic1, 1);
    myMQTTClient.subscribe("B/pick/warn/#", 1);
    myMQTTClient.subscribe(mqttConfiguration.topic3, 1);
    myMQTTClient.subscribe(mqttConfiguration.topic4, 1);
    }
    /**
    * 消息到达后
    * subscribe后,执行的回调函数
    *
    * @param s
    * @param mqttMessage
    * @throws Exception
    */
    /**
    * publish后,配送完成后回调的方法
    *
    * @param iMqttDeliveryToken
    */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
    }
    }

    运行测试:

    f73dca31ea364f209cc9d69ae2aac8c0.png

     

    emqx 面板

    a7561bd1e75d49e196e9a8499a202526.png

    ws 数据订阅

    5408079f23a747b9986e0aeccf23c7bf.png

    c48f661f70874bd99b6732784fc4a262.png

     

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » Emqx入门系列【3】客户端认证方式之-内置用户认证
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!