go+redis实现消息队列发布与订阅的详细过程
短信预约 -IT技能 免费直播动态提醒
在做项目过程中,实现websocket得时候,不知道哪里写的不太合适,客户端消息收到一定程度,剩下的消息收不到,修改了缓冲区大小,还是没有解决问题,后面因为项目结束期比较紧张,没有时间调试消息的时候,改用了redis队列去做了消息得暂存,客户端轮询去拿对应的消息。
1.生产者随机发布消息,用rpush发布。
2.消费者用lpop订阅消费,一旦没有消息,随机休眠。
redis做消息队列的缺点:没有持久化。一旦消息没有人消费,积累到一定程度后就会丢失
package main
import (
"fmt"
"time"
"os"
"strconv"
"math/rand"
"github.com/gomodule/redigo/redis"
)
const RMQ string = "mqtest"
func producer() {
redis_conn, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword("hdiot"))
if err != nil {
fmt.Println(err)
return
}
defer redis_conn.Close()
rand.Seed(time.Now().UnixNano())
var i = 1
for {
_,err = redis_conn.Do("rpush", RMQ, strconv.Itoa(i))
if(err!=nil) {
fmt.Println("produce error")
continue
}
fmt.Println("produce element:%d", i)
time.Sleep(time.Duration(rand.Intn(10))*time.Second)
i++
}
}
func consumer() {
redis_conn, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword("hdiot"))
if err != nil {
fmt.Println(err)
return
}
defer redis_conn.Close()
rand.Seed(time.Now().UnixNano())
for {
ele,err := redis.String(redis_conn.Do("lpop", RMQ))
if(err != nil) {
fmt.Println("no msg.sleep now")
time.Sleep(time.Duration(rand.Intn(10))*time.Second)
} else {
fmt.Println("cosume element:%s", ele)
}
}
}
func main() {
list := os.Args
if(list[1] == "pro") {
go producer()
} else if (list[1] == "con") {
go consumer()
}
for {
time.Sleep(time.Duration(10000)*time.Second)
}
}
到此这篇关于go+redis实现消息队列发布与订阅的详细过程的文章就介绍到这了,更多相关go redis消息队列内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341