apache druid 实时加载kafka 中的数据(一)
简介
apache druid 是分布式列存储的 OLAP 框架。还是一个时间序列数据库。本篇文章主要是druid 在kafka 加载数据的配置。由于druid 升级情况太快,本人的环境还是在0.13,主要改动方面还是UI,新的版本在UI方面更适合新手入门。
文章如有帮助,请关注微信公共号。
最终使用druid时,是0.9版本,当时在kafka加载数据推荐的方式是两种
-
Tranquility
-
kafka index service
Tranquility
是用于将流实时推送到Druid的工具包。是一个独立,需要单独下载。
** 其特点**
无缝地处理分区,复制,服务发现和架构过渡,而无需停机。集成了http server,Samza,Spark ,Storm,Flink 等工具。
可以自由的控制向druid,主动发送数据。
** 劣势**
本身具有时间窗,超过时间窗的数据直接丢弃。
版本落后,由于没有官方组织维护,目前版本只是兼容值0.9.2,后面druid升级后,Tranquility未及时升级,有些新的api 无法适配。
kafka index service
这是druid 自身携带的扩展插件,使用时,需要在common.runtime.properties 文件中的属性 druid.extensions.loadList 添加druid-kafka-indexing-service。
** 其特点**
支持实时查询按时间分segment,非实追加到对应时间的segment 。
通过算法把Peon分配到 不同的【 Middle Managers】上实现分布式
加大对应kafka的topic的partition数量 加大taskCount的值,产生更多的Peon
创建 supervisor
上面是一个完整的supervisor的内容,主要包含type,dataSchema,tuningConfig,ioConfig 四个部分
- type
标记类型,supervisor 的类型 就是kafka.
- dataSchema
数据库的配置,主要包含dataSource,parser,metricsSpec,granularitySpec
dataSource
druid的数据库名称。
parser
配置与解析数据。简单理解就是kafka中的数据与druid存储之间的关系映射。主要包含以下配置
timestampSpec
配置处于的位置 dataSchema->parser->timestampSpec
druid 本身是时间序列数据库,故此时间就是数据的主键。由于druid 在 0.9后,已经不支持设置时区了,时间都是采用的utc格式。druid查询时,可以设置时区。包括一些roll-up操作都是按照utc时间进行。如有必须需改动源码。
dimensionsSpec
位置:dataSchema->parser->dimensionsSpec
维度。数据库需要存储的字段,需要与kafka中的对应。
dimensions
是一个数组类型,默认字段的类型都是string
可以设置字段的类型,例如{ "type": "long", "name": "userId" }
metricsSpec
位置:dataSchema->metricsSpec
度量。此值roll-up 启用是才有意义。
`{ "name": "theta_customer_id",
"type": "thetaSketch",
"fieldName": "customer_id"
} `
name: druid中字段的名称。
type:指标类型。thetaSketch 去重。还支持doubleSum,longSum,doubleMin,doubleMax 等聚合类型。
fieldName:kafka中 属性的名称
granularitySpec
位置: dataSchema->granularitySpec
segmentGranularity: Segment粒度(SegmentGranularity)表示每一个实时索引任务中产生的Segment所涵盖的时间范围。
queryGranularity:查询粒度。例如 {"queryGranularity":"DAY"} 查询的最小粒度就是DAY,经过roll-up后,维度完全一样的数据,一天范围内将聚合为一条数据。
- tuningConfig
调优相关的配置。
配置一个segment大小。
调整压缩算法。
-
ioConfig
消费者的配置。对于kafak index service 就是kafka 消费者一个配置。
下面的实例,配置了kafka的topic,启动的任务数量,任务执行的时间,kafka的地址。
completionTimeout:这个值将发布任务声明为失败并终止之前等待的时间。如果设置得太低,您的任务可能永远不会发布。任务的发布时间大约在taskDuration过去之后开始。默认是30M,为防止任务未发布,调整为与任务时间一致(PT3600S)
"ioConfig": {
"topic": "com.test",
"replicas": 1, "taskCount": 1,
"taskDuration": "PT3600S",
"consumerProperties": {
"bootstrap.servers": "10.0.0.1:9096,10.0.0.1:9096"
},
"completionTimeout": "PT3600S"
}
提交supervisor
提交至overlord节点。
新版中出现界面配置
第一种,根据界面的配置向导来加载kafka数据
访问:8888 端口
一直按照向导配置,就可以自动生成supervisor的配置 很方便。
第二种,通过页面 提供的Submit supervisor提交 相应的json文件
总结
简单介绍了下supervisor 重点配置的具体含义,由于篇幅问题,详细的配置还需要去官网文档中查看。本文的目的就是通过个人使用 kafka index service时一些新得,帮助新手能快速跑通第一个druid实例。
文章如有帮助,请关注微信公共号。
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341