SpringBoot+MQTT+apollo实现订阅发布功能,详细代码解析

由于最近公司在开发一款后台与安卓的更新系统,经过再三研究之后,也是选择Mqtt这个目前流行的框架。为了能够让项目运营起来,最终虽说是选择ActiveMQ。但在这个过程中,也是发现Apollo作为服务器也是相当不错。当然对于后者已经被apace放弃,不过今天还是和大家整理一下

SpringBoot+MQTT+apollo实现订阅发布功能的全过程。


对于项目首先需要用到的前提东西,比如Apollo如何下载,以及MQTT测试工具在这里就不多说。如果真的不懂私聊Damon吧,在这里就不浪费时间。

1.对于项目,首先你所需要引入maven包:
pom.xml

<code>         
            
    org.springframework.integration            
    spring-integration-stream        
        
            
    org.springframework.integration          
    spring-integration-mqtt        

/<code>

其目标就是将MQTT用在项目组中
接着就是项目yml文件的配置,使用properties,以葫芦画瓢就行了:

applicaiton.yml:

<code>

    mqtt:
      username: admin
      password: password
      host-url: tcp://127.0.0.1:8161        # 你自己服务器的地址和端口,这个需要改
      clientID: test1                            # 这个改不改随意,但不同的客户端肯定不能一样
      default-topic: home/garden/fountain                        # 默认主题
      timeout: 100
      keepalive: 100
    
    # Tomcat
    server:
      tomcat:
        uri-encoding: UTF-8
        max-threads: 1000
        min-spare-threads: 30
      port: 8088

/<code>

注意host-url,这就是你apollo的地址


来到第三步,此时就是项目内的文件:

MqttConfig文件

<code>
    @Component
    @ConfigurationProperties("mqtt")
    @Setter
    @Getter
    public class MqttConfig {
        @Autowired
        private MqttPushClient mqttPushClient;
    
        /**
         * 用户名
         */
        // @Value("username")
        private String username;
        /**
         * 密码
         */
        private String password;
        /**
         * 连接地址
         */
        private String hostUrl;
        /**
         * 客户Id
         */
        private String clientID;
        /**
         * 默认连接话题
         */
        private String defaultTopic;
        /**
         * 超时时间
         */
        private int timeout;
        /**
         * 保持连接数
         */
        private int keepalive;
    
        @Bean
        public MqttPushClient getMqttPushClient() {
            System.out.println("hostUrl: "+ hostUrl);
            System.out.println("clientID: "+ clientID);
            System.out.println("username: "+ username);
            System.out.println("password: "+ password);
            System.out.println("timeout: "+timeout);
            System.out.println("keepalive: "+ keepalive);
            mqttPushClient.connect(hostUrl, clientID, username, password, timeout, keepalive);
            // 以/#结尾表示订阅所有以test开头的主题
            mqttPushClient.subscribe(defaultTopic, 0);
            return mqttPushClient;
        }
    }
/<code>

目的就是配置所对应的消息
第四步就是发布以及订阅等功能:
MqttPushClient

<code>
    @Component
    public class MqttPushClient {
        private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
    
        @Autowired
        private PushCallback pushCallback;
    
        private static MqttClient client;
    
        private static MqttClient getClient() {
            return client;
        }
    
        private static void setClient(MqttClient client) {
            MqttPushClient.client = client;
        }
    
        /**
         * 客户端连接
         *
         * @param host      ip+端口
         * @param clientID  客户端Id
         * @param username  用户名
         * @param password  密码
         * @param timeout   超时时间
         * @param keepalive 保留数
         */
        public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
            MqttClient client;
            try {
                client = new MqttClient(host, clientID, new MemoryPersistence());
                MqttConnectOptions options = new MqttConnectOptions();
                options.setCleanSession(true);
                options.setUserName(username);
                options.setPassword(password.toCharArray());
                options.setConnectionTimeout(timeout);
                options.setKeepAliveInterval(keepalive);
                MqttPushClient.setClient(client);
                try {
                    client.setCallback(pushCallback);
                    client.connect(options);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 发布
         *
         * @param qos         连接方式
         * @param retained    是否保留
         * @param topic       主题
         * @param pushMessage 消息体
         */
        public void publish(int qos, boolean retained, String topic, String pushMessage) {
            MqttMessage message = new MqttMessage();
            message.setQos(qos);
            message.setRetained(retained);
            message.setPayload(pushMessage.getBytes());
            MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
            if (null == mTopic) {
                logger.error("topic not exist");
            }
            MqttDeliveryToken token;
            try {
                token = mTopic.publish(message);
                token.waitForCompletion();
            } catch (MqttPersistenceException e) {
                e.printStackTrace();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 订阅某个主题
         *
         * @param topic 主题
         * @param qos   连接方式
         */
        public void subscribe(String topic, int qos) {
            logger.info("开始订阅主题" + topic);
            try {
                MqttPushClient.getClient().subscribe(topic, qos);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }
/<code>

订阅主题以及发布的方式等内容更多编写
最后再搞个测试看看我们的结果是否正确:
TestController

<code>
    @RestController
    @RequestMapping("/")
    public class TestController {
    
        @Autowired
        private MqttPushClient mqttPushClient;
    
        @GetMapping(value = "/publishTopic")
        public String publishTopic() {
            String topicString = "home/garden/fountain";
            mqttPushClient.publish(0, false, topicString, "测试一下发布消息");
            return "ok";
        }
        // 发送自定义消息内容(使用默认主题)
        @RequestMapping("/publishTopic/{data}")
        public String test1(@PathVariable("data") String data) {
            String topicString = "home/garden/fountain";
            mqttPushClient.publish(0,false,topicString, data);
            return "ok";
        }
    
        // 发送自定义消息内容,且指定主题
        @RequestMapping("/publishTopic/{topic}/{data}")
        public String test2(@PathVariable("topic") String topic, @PathVariable("data") String data) {
            mqttPushClient.publish(0,false,topic, data);
            return "ok";
        }
    }
/<code>

如此一来就OK!
你可以使用MQTT.fx进行测试。用Postman发出,就能够查看最终的结果。在这里,因为时间的原因就不多说,有啥有趣的问题,咱们可以一同探讨。
希望你希望,Damon将会不断的分享各种有趣的开发小故事给大家娱乐。下一期,或在POI实现导出导入或者是ActiveMQ进行选择。


最后一句:远赴人间惊鸿宴 一睹人间盛世颜 努力拼搏,向前栈不断前进!


SpringBoot+MQTT+apollo实现订阅发布功能,详细代码解析


分享到:


相關文章: