蚊子的持久性不一致
知识点掌握了,还需要不断练习才能熟练运用。下面编程网给大家带来一个Golang开发实战,手把手教大家学习《蚊子的持久性不一致》,在实现功能的过程中也带大家重新温习相关知识点,温故而知新,回头看看说不定又有不一样的感悟!
问题内容我发现 mosquitto 上的消息传递与消息持久性和 qos=2 不一致。我有什么做错的地方吗?
我有一个简单的测试应用程序,它注册一个主题以使用 clientid="receive-client" 进行消费,但立即断开连接。然后,它以 clientid="send-client" 身份连接并发布 10 条消息,“消息 #1”...“消息 #10”。然后断开连接,等待五秒钟,然后再次连接到“receive-client”进行消费,同时打印和统计收到的消息。
结果不一致。有时我收到 6 条消息,有时收到 8 条消息。典型的输出如下所示:
warn[0005] got a message:message #1
warn[0005] got a message:message #2
warn[0005] got a message:message #3
warn[0005] got a message:message #4
warn[0005] got a message:message #5
warn[0005] got a message:message #6
warn[0005] got a message:message #7
warn[0005] got a message:message #8
warn[0305] pause
warn[0605] received message count=8
我的版本信息显示为1.4.15。我的 mosquitto.conf 是:
pid_file /var/run/mosquitto.pid
persistence true
persistence_location /var/lib/mosquitto/
allow_anonymous false
password_file /etc/mosquitto/passwd
log_dest file /var/log/mosquitto/mosquitto.log
最初,/var/lib/mosquitto/mosquitto.db 直到运行多次迭代后才显示。我的测试应用程序在这里:
import (
mqtt "github.com/eclipse/paho.mqtt.golang"
log "github.com/sirupsen/logrus"
"time"
)
var receivedMsg int
func Persist() {
const TOPIC = "test"
const URL = "tcp://localhost:1883"
const USERNAME = "myuser"
const PASSWORD = "mypassword"
defer printReceived()
options := mqtt.NewClientOptions().AddBroker(URL).SetUsername(USERNAME).SetPassword(PASSWORD)
options.SetCleanSession(false)
options.SetConnectRetry(true)
options.SetConnectRetryInterval(10 * time.Millisecond)
// register the receive client with broker / TOPIC
// to be sure the broker knows it needs to save our messages
// to deliver at a later time
options.SetClientID("receive-client")
client := mqtt.NewClient(options)
token := client.Connect()
token.Wait()
if token := client.Subscribe(TOPIC, 2, consume1); token.Wait() && token.Error() != nil {
panic(token.Error())
}
client.Disconnect(0)
// connect with send client and send 10 messages
options.SetClientID("send-client")
client = mqtt.NewClient(options)
token = client.Connect()
token.Wait()
client.Publish(TOPIC, 2, false, "message #1")
client.Publish(TOPIC, 2, false, "message #2")
client.Publish(TOPIC, 2, false, "message #3")
client.Publish(TOPIC, 2, false, "message #4")
client.Publish(TOPIC, 2, false, "message #5")
client.Publish(TOPIC, 2, false, "message #6")
client.Publish(TOPIC, 2, false, "message #7")
client.Publish(TOPIC, 2, false, "message #8")
client.Publish(TOPIC, 2, false, "message #9")
client.Publish(TOPIC, 2, false, "message #10")
client.Disconnect(4)
time.Sleep(5* time.Second)
// subscribe again and try to retrieve the messages we missed
options.SetClientID("receive-client")
client = mqtt.NewClient(options)
token = client.Connect()
token.Wait()
if token := client.Subscribe(TOPIC, 2, consume2); token.Wait() && token.Error() != nil {
panic(token.Error())
}
time.Sleep(300 * time.Second)
log.Warn("PAUSE")
time.Sleep(300 * time.Second)
}
func consume1(client mqtt.Client, msg mqtt.Message) {
receivedMsg++
log.Warn("THIS SHOULD NOT BE CONSUMING ANY MESSAGES:", string(msg.Payload()))
}
func consume2(client mqtt.Client, msg mqtt.Message) {
receivedMsg++
log.Warn("GOT A MESSAGE:", string(msg.Payload()))
}
func printReceived() {
log.Warn("received message count=", receivedMsg)
}
解决方案
以 qos 2 进行发布是一个多步骤过程,因此最可能的原因是您在所有消息实际完成向代理的发布之前断开了发布客户端的连接。
您可能应该在循环中进行发布,并使用调用 client.publish()
时返回的令牌来等待它完成,然后再断开客户端连接。
例如如示例所示:
//Publish 5 messages to /go-mqtt/sample at qos 1 and wait for the receipt
//from the server after sending each message
for i := 0; i < 5; i++ {
text := fmt.Sprintf("this is msg #%d!", i)
token := c.Publish("go-mqtt/sample", 0, false, text)
token.Wait()
}
好了,本文到此结束,带大家了解了《蚊子的持久性不一致》,希望本文对你有所帮助!关注编程网公众号,给大家分享更多Golang知识!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341