文章目录
前言MQTTMQTT 实现Eclipse MosquittoApache Apollo
Spring Boot Integration MQTTgradle 依赖MQTT Config发送消息订阅消息总结
前言
项目中有针对自己公司研发的终端设备需要远程控制和远程参数设置的功能,还需要将设备上报的状态数据监控到,在技术选型上,一般这种情况都选择Mina 或者 Netty 做长链接。 但是长链接也有问题针对于协议设计和安全性上需要重新设计和考虑。 所以在技术选型上直接按照现有协议来选择,比较完善的及时通信协议有MQTT、XMPP、JMS等。 最终因为MQTT协议低耦合、轻量级的特点,选择成为设备远程控制协议。
MQTT
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
引用自菜鸟教程 MQTT 内容 https://www.runoob.com/w3cnote/mqtt-intro.html
MQTT 实现
协议终归只是规范一个过程,但是基于规范肯定是要实现的。 MQTT 既然是一个成熟协议,那么实现该协议的中间件也就会有很多,我们列举出常用的两个实现,供各位选择。
Eclipse Mosquitto
看名字就知道是Java世界有名的Eclipse 基金会下的一个MQTT 协议的通信中间件。C语言开发的,性能和稳定性没得说。 配置不在这边赘述,需要学习的话,建议参考其他博主的教程,或者去官网学习。
官方地址 https://mosquitto.org/
Apache Apollo
Apollo也出身豪门Apache 基金会,但是Apollo项目其实并不能单纯的算MQTT协议的中间件,其代码是在ActiveMQ基础上发展而来的,可以支持STOMP, AMQP, MQTT, Openwire, SSL, WebSockets 等多种通信协议。
官网地址 https://activemq.apache.org/
其实仅需要MQTT通信的话,Eclipse Mosquitto 就够用了,再加上其实Springboot 中对MQTT支持其实用的是Eclipse Paho 作为客户端程序的。所以最终选择在服务器上部署Eclipse Mosquitto 作为消息中间件。
Spring Boot Integration MQTT
gradle 依赖
dependencies {
implementation "org.springframework.boot:spring-boot-starter-integration"
implementation "org.springframework.integration:spring-integration-stream"
implementation "org.springframework.integration:spring-integration-mqtt"
}
从依赖就可以看出,其实spring对标准协议这块的支持模块化做的很优秀的。
MQTT Config
import org
.eclipse
.paho
.client
.mqttv3
.MqttConnectOptions
import org
.slf4j
.LoggerFactory
import org
.springframework
.beans
.factory
.annotation
.Autowired
import org
.springframework
.beans
.factory
.annotation
.Value
import org
.springframework
.context
.annotation
.Bean
import org
.springframework
.context
.annotation
.Configuration
import org
.springframework
.integration
.annotation
.ServiceActivator
import org
.springframework
.integration
.channel
.DirectChannel
import org
.springframework
.integration
.core
.MessageProducer
import org
.springframework
.integration
.mqtt
.core
.DefaultMqttPahoClientFactory
import org
.springframework
.integration
.mqtt
.core
.MqttPahoClientFactory
import org
.springframework
.integration
.mqtt
.inbound
.MqttPahoMessageDrivenChannelAdapter
import org
.springframework
.integration
.mqtt
.outbound
.MqttPahoMessageHandler
import org
.springframework
.integration
.mqtt
.support
.DefaultPahoMessageConverter
import org
.springframework
.messaging
.MessageChannel
import org
.springframework
.messaging
.MessageHandler
import org
.unreal
.hardware
.receiver
.handler
.MessageReceiverHandler
@Configuration
class MqttConfig
{
@Value("\${spring.mqtt.url}")
private lateinit var url
: String
@Value("\${spring.mqtt.producer.clientId}")
private lateinit var producerClientId
: String
@Value("\${spring.mqtt.producer.defaultTopic}")
private lateinit var producerDefaultTopic
: String
@Value("\${spring.mqtt.consumer.clientId}")
private lateinit var consumerClientId
: String
@Value("\${spring.mqtt.consumer.deviceStatusTopic}")
private lateinit var consumerDeviceStatusTopic
: String
@Value("\${spring.mqtt.timeout}")
private lateinit var timeout
: String
@Value("\${spring.mqtt.keepalive}")
private lateinit var keepalive
: String
@Autowired
private lateinit var messageReceiverHandler
: MessageReceiverHandler
private val logger
= LoggerFactory
.getLogger(MqttConfig
::class.java
)
val mqttConnectOptions
: MqttConnectOptions
@Bean
get() {
val options
= MqttConnectOptions()
options
.isCleanSession
= true
options
.serverURIs
= arrayOf(url
)
options
.connectionTimeout
= timeout
.toInt()
options
.keepAliveInterval
= keepalive
.toInt()
options
.setWill("/server/offline", WILL_DATA
, 2, false)
return options
}
@Bean
fun mqttClientFactory(): MqttPahoClientFactory
{
val factory
= DefaultMqttPahoClientFactory()
factory
.connectionOptions
= mqttConnectOptions
return factory
}
@Bean(name
= [CHANNEL_NAME_OUT
])
fun mqttOutboundChannel(): MessageChannel
{
return DirectChannel()
}
@Bean
@ServiceActivator(inputChannel
= CHANNEL_NAME_OUT
)
fun mqttOutbound(): MessageHandler
{
val messageHandler
= MqttPahoMessageHandler(
producerClientId
,
mqttClientFactory()
)
messageHandler
.setAsync(true)
messageHandler
.setDefaultTopic(producerDefaultTopic
)
return messageHandler
}
@Bean
fun inbound(): MessageProducer
{
val adapter
= MqttPahoMessageDrivenChannelAdapter(
consumerClientId
, mqttClientFactory(),
consumerDeviceStatusTopic
)
adapter
.setCompletionTimeout(5000)
adapter
.setConverter(DefaultPahoMessageConverter())
adapter
.setQos(1)
adapter
.outputChannel
= mqttInboundChannel()
return adapter
}
@Bean(name
= [CHANNEL_NAME_IN
])
fun mqttInboundChannel(): MessageChannel
{
return DirectChannel()
}
@Bean
@ServiceActivator(inputChannel
= CHANNEL_NAME_IN
)
fun handler(): MessageHandler
{
return messageReceiverHandler
}
companion object {
private val WILL_DATA
: ByteArray
= "offline".toByteArray()
const val CHANNEL_NAME_IN
= "mqttInputChannel"
const val CHANNEL_NAME_OUT
= "mqttOutboundChannel"
}
}
可以看到上面的一些参数实际上是在application.yml中配置的,所以也贴出application.yml文件中的配置
spring:
mqtt:
url: tcp
://192.168.0.9
:1883
timeout: 10
keepalive: 20
producer:
clientId: hardwareServerSender
defaultTopic: device
consumer:
clientId: hardwareServerReceiver
deviceStatusTopic: /server/device/status
以上 Mqtt 环境配置完成。
发送消息
@Component
@MessagingGateway(defaultRequestChannel
= MqttConfig
.CHANNEL_NAME_OUT
)
interface MqttSender
{
fun sendToMqtt(data: String
)
fun sendToMqtt(
@Header(MqttHeaders
.TOPIC
) topic
: String
,
payload
: String
)
fun sendToMqtt(
@Header(MqttHeaders
.TOPIC
) topic
: String
,
@Header(MqttHeaders
.QOS
) qos
: Int
,
payload
: String
)
}
调用发送时,仅需要将MqttSender的实例化对象注入到需要的地方,调用对应方法即可完成消息发送. SAMPLE:
@Service
class DeviceCommandServiceImpl
: DeviceCommandService
{
@Autowired
private lateinit var mqttSender
: MqttSender
fun openGate(sn
: String
) {
sendMQTTMessage(sn
, "(option:\"openGate\",time:\""+System
.currentTime()+"\"}")
}
}
订阅消息
在配置的时候,其实将消息处理的Handler 已经配置在MQTT中,所以仅需要实现消息处理的hander即可。
@Service
class MessageReceiverHandler
: MessageHandler
{
@Autowired
private lateinit var deviceStatusMessageProcessor
: DeviceStatusMessageProcessor
@Autowired
private lateinit var deviceCommandMessageProcessor
: DeviceCommandMessageProcessor
@Autowired
private lateinit var deviceParamsMessageProcessor
: DeviceParamsMessageProcessor
@Autowired
private lateinit var deviceNamelistMessageProcessor
: DeviceNamelistMessageProcessor
@Autowired
private lateinit var devicePrinterMessageProcessor
: DevicePrinterMessageProcessor
private val logger
= LoggerFactory
.getLogger(MessageReceiverHandler
::class.java
)
private val statusTopic
= "status"
override fun handleMessage(message
: Message
<*>) {
val topic
= message
.headers
["mqtt_receivedTopic"].toString()
val type
= topic
.substring(topic
.lastIndexOf("/") + 1, topic
.length
)
logger
.debug("MQTT handler receiver message type is ------> $type")
logger
.debug("MQTT handler receiver message is ------> ${message.payload}"
TODO("在此处完成业务代码即可")
}
}
总结
以上就完成了对MQTT协议的对接处理。其实主要就是发送 和接收 处理好这两部分就可以了。 代码不建议复制粘贴,自己动手写一下对代码的理解会更好。有问题可以通过站内信联系我,看到了一定会回复的!