golang连接kafka的示例代码
短信预约 -IT技能 免费直播动态提醒
1 下载,配置,启动 kafka
下载链接
配置修改
在config目录下的server文件和zookeeper文件,其中分别修改kafka的日志保存路径和zookeeper的数据保存路径。
启动kafka
先启动kafka自带的zookeeper,在kafka的根目录下打开终端,使用配置文件启动
./bin/windows/zookeeper-server-start.bat config/zookeeper.properties
同样在kafka目录的根目录下启动kafka
./bin/windows/kafka-server-start.bat config/server.properties
2 使用golang的github.com/Shopify/sarama库连接kafka
package main
import (
"fmt"
"time"
"github.com/Shopify/sarama"
)
func main() {
config:=sarama.NewConfig()
// 生产者配置
config.Producer.RequiredAcks=sarama.WaitForAll
config.Producer.Partitioner=sarama.NewRandomPartitioner
config.Producer.Return.Successes=true
// 封装消息
msg:=&sarama.ProducerMessage{}
msg.Topic="shopping"
time_str:=time.Now().Format("2006-01-02 15:04:05")
msg.Value=sarama.StringEncoder("0413 test log!"+time_str)
// 连接kafka
client,err:=sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err!=nil {
fmt.Println("producer closed", err)
return
}
defer client.Close()
// 发送消息
partition,offset,err:=client.SendMessage(msg)
if err!=nil {
fmt.Println("send failed", err)
return
}
fmt.Printf("partition:%v offset:%v", partition, offset)
}
这段代码实现了模拟生产者向kafka发送消息的过程,包含:配置生产者,封装消息,消息类型是 *sarama.ProducerMessage
,连接kafka,默认端口是9092,发送消息,返回消息存储的partition和offset日志偏移量。
3 确认生产者发送成功
使用kafka自带的命令行消费者客户端查看kafka中的数据
在kafka的根目录下
bin/windows/kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic shopping --from-beginning
这里的topic和代码中的topic一致,均为shopping
终端会输出之前发送的数据。
到此这篇关于golang连接kafka的示例代码的文章就介绍到这了,更多相关golang连接kafka内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341