我的编程空间,编程开发者的网络收藏夹
学习永远不晚

试用时间序列数据库InfluxDB

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

试用时间序列数据库InfluxDB

Hadoop集群监控需要使用时间序列数据库,今天花了半天时间调研使用了一下最近比较火的InfluxDB,发现还真是不错,记录一下学习心得。


Influx是用Go语言写的,专为时间序列数据持久化所开发的,由于使用Go语言,所以各平台基本都支持。类似的时间序列数据库还有OpenTSDB,Prometheus等。

OpenTSDB很有名,性能也不错,但是基于HBase,要用那个还得先搭一套HBase,有点为了吃红烧肉自己得先去杀猪,烫皮,拔毛的感觉。Prometheus相关文档和讨论太少,而InfluxDB项目活跃,使用者多,文档也比较丰富,所以先看看这个。Influx可以说是LevelDB的Go语言修改版实现,LevelDB采用LSM引擎,效率很高,Influx是基于LSM引擎再修改的TSM引擎,专为时间序列设计。


InfluxDB 架构原理介绍

LevelDB 架构原理介绍


下午跟七牛的CrazyJVM聊了一下,因为七牛都是用Go,所以也大量部署了Influx给大型企业级用户使用,据说是全球最大的InfluxDB集群,七牛也给Influx提交了大量的Patch,结果Influx通过早期开源弄得差不多稳定了,突然就闭源了,这也太不局气了,然后搞Cluster功能收费,单机功能免费使用。


昨天看了一会文档,今天试用了一下,感觉很不错,值得推荐。把学习了解的内容记录下来,供爱好者参考,也省的自己时间长忘了。


InfluxDB其实不能说像哪个数据库,初上手感觉更像Mongo类型的NoSQL,但是有意思的是,它提供了类SQL接口,对开发人员十分友好。命令行查询结果的界面又有点像MySQL,挺有意思的。


不写安装部署和CLI接口,因为实在没得可写,直接yum或者apt就装了。service一启动,再influx命令就进命令行了,网上一大堆安装教程


InfluxDB有几个关键概念是需要了解的。

database:相当于RDBMS里面的库名。创建数据库的语句也十分相似。一进去就可以先创建一个数据库玩,加不加分号都行。

CREATE DATABASE 'hadoop'


然后需要创建一个用户,我省事点,直接创建一个最高权限,就看了一天,然后直接写REST接口去了,权限管理慢慢再细看。

CREATE USER "xianglei" WITH PASSWORD 'password' WITH ALL PRIVILEGES


使用查询语句插入一条数据

INSERT hdfs,hdfs=adh,path=/ free=2341234,used=51234123,nonhdfs=1234

Influx没有先建立schema的概念,因为Influx允许存储的数据是schemeless的,表在这里叫做measurement,数据在插入时如果没有表,则自动创建该表。

measurement: 相当于RDBMS里面的数据表名。

在上面的INSERT语句中,跟在insert后面的第一个hdfs就是measurement,如果不存在一个叫做hdfs的,就自动创建一个叫做hdfs的表,否则直接插入数据。

然后是tags,tags的概念类似于RDBMS里面的查询索引名,这里的tags是hdfs=adh和path=/,等于我建立了两个tags。

free往后统称叫fields,tags和fields之间用空格分开,tags和fields内部自己用逗号分开。tags和fields名称可以随意填写,主要是一开始设计好就行。

所以,对以上插入语句做一个注释的话,就是这样。

INSERT [hdfs(measurement)],[hdfs=adh,path=/(tags)] [free=2341234,used=51234123,nonhdfs=1234(fields)]


然后即可查询该数据

SELECT free FROM hdfs WHERE hdfs='adh' and path='/'
name: hdfs
time                    free
----                    ----
1485251656036494252     425234
1485251673348104714     425234

SELECT * FROM hdfs LIMIT 2
name: hdfs
time                  free    hdfs    nonhdfs  path   used
----                  ----    ----    -------  ----   ----
1485251656036494252   425234  adh     1341     /      23412
1485251673348104714   425234  adh     1341     /      23412

这里的where条件,即是上面tags里面的hdfs=adh和path=/,所以tags可以随意添加,但是在插入第一条数据的时候,最好先设计好你的查询条件。当然,你插入的任何数据,都会自动添加time列,数了数,应该是纳秒级的时间戳。



上面是Influx的基本概念和基本使用的记录,下面是接口开发的使用。以Tornado示例Restful查询接口。

Influx本身支持restful的HTTP API,python有直接封装的接口可以调用,直接 pip install influxdb即可

influxdb-python文档


Talk is cheap, show me your code.

Models  Influx模块,用于连接influxdb

class InfluxClient:
    def __init__(self):
        self._conf = ParseConfig()
        self._config = self._conf.load()
        self._server = self._config['influxdb']['server']
        self._port = self._config['influxdb']['port']
        self._user = self._config['influxdb']['username']
        self._pass = self._config['influxdb']['password']
        self._db = self._config['influxdb']['db']
        self._retention_days = self._config['influxdb']['retention']['days']
        self._retention_replica = self._config['influxdb']['retention']['replica']
        self._retention_name = self._config['influxdb']['retention']['name']
        self._client = InfluxDBClient(self._server, self._port, self._user, self._pass, self._db)

    def _create_database(self):
        try:
            self._client.create_database(self._db)
        except Exception, e:
            print e.message

    def _create_retention_policy(self):
        try:
            self._client.create_retention_policy(self._retention_name,
                                                 self._retention_days,
                                                 self._retention_replica,
                                                 default=True)
        except Exception, e:
            print e.message

    def _switch_user(self):
        try:
            self._client.switch_user(self._user, self._pass)
        except Exception, e:
            print e.message

    def write_points(self, data):
        self._create_database()
        self._create_retention_policy()
        if self._client.write_points(data):
            return True
        else:
            return False

    def query(self, qry):
        try:
            result = self._client.query(qry)
            return result
        except Exception, e:
            return e.message

连接influxdb的配置从项目的配置文件里读取,自己写也行。

Controller模块InfluxController

class InfluxRestController(tornado.web.RequestHandler):
    '''
    "GET"
        op=query&qry=select+used+from+hdfs+where+hdfs=adh
    '''
    查询方法,使用HTTP GET
    def get(self, *args, **kwargs):
        op = self.get_argument('op')
        #自己实现的python switch case,网上一大堆
        for case in switch(op):
            if case('query'):
                #查询语句从url参数获取
                qry = self.get_argument('qry')
                #实例化Models里面的class
                influx = InfluxClient()
                result = influx.query(qry)
                #返回结果为对象,通过raw属性获取对象中的字典。
                self.write(json.dumps(result.raw, ensure_ascii=False))
                break

            if case():
                self.write('No argument found')

    #写入数据,使用HTTP PUT
    def put(self):
        op = self.get_argument('op')
        for case in switch(op):
            if case('write'):
                #data should urldecode first and then turn into json
                data = json.loads(urllib.unquote(self.get_argument('data')))
                influx = InfluxClient()
                #写入成功或失败判断
                if influx.write_points(data):
                    self.write('{"result":true}')
                else:
                    self.write('{"result":false}')
                break
            if case():
                self.write('No argument found')

Tornado配置路由

applications = tornado.web.Application(
    [
        (r'/', IndexController),
        (r'/ws/api/influx', InfluxRestController)
    ],
    **settings
)

JSON项目配置文件

{
  "http_port": 19998,
  "influxdb":{
    "server": "47.88.6.247",
    "port": "8086",
    "username": "root",
    "password": "root",
    "db": "hadoop",
    "retention": {
      "days": "365d",
      "replica": 3,
      "name": "hound_policy"
    },
    "replica": 3
  },
  "copyright": "CopyLeft 2017 Xianglei"
}

插入测试

def test_write():
    base_url = 'http://localhost:19998/ws/api/influx'
    #data = '[{"measurement": "hdfs"},"tags":{"hdfs": "adh","path":"/user"},"fields":{"used": 234123412343423423,"free": 425234523462546546,"nonhdfs": 1341453452345}]'
    #构造插入数据
    body = dict()
    body['measurement'] = 'hdfs'
    body['tags'] = dict()
    body['tags']['hdfs'] = 'adh'
    body['tags']['path'] = '/'
    body['fields'] = dict()
    body['fields']['used'] = 234123
    body['fields']['free'] = 425234
    body['fields']['nonhdfs'] = 13414
    tmp = list()
    tmp.append(body)

    op = 'write'
    # dict data to json and urlencode
    data = urllib.urlencode({'op': op, 'data': json.dumps(tmp)})
    headers = {'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'}
    try:
        http = tornado.httpclient.HTTPClient()
        response = http.fetch(
            tornado.httpclient.HTTPRequest(
                url=base_url,
                method='PUT',
                headers=headers,
                body=data
            )
        )
        print response.body
    except tornado.httpclient.HTTPError, e:
        print e

test_write()


插入数据后通过访问http连接获取插入结果

curl -i "http://localhost:19998/ws/api/influx?op=query&qry=select%20*%20from%20hdfs"
HTTP/1.1 200 OK
Date: Tue, 24 Jan 2017 15:47:42 GMT
Content-Length: 1055
Etag: "7a2b1af6edd4f6d11f8b000de64050a729e8621e"
Content-Type: text/html; charset=UTF-8
Server: TornadoServer/4.4.2

{"values": [["2017-01-24T09:54:16.036494252Z", 425234, "adh", 13414, "/", 234123]], "name": "hdfs", "columns": ["time", "free", "hdfs", "nonhdfs", "path", "used"]}


收工,明天用React写监控前端

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

试用时间序列数据库InfluxDB

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

时序数据库InfluxDB的基本语法

一 了解InfluxDB的必要性时序数据库主要存放的数据Time series data is a series of data points each associated with a specific time. Examples include:Ser
时序数据库InfluxDB的基本语法
2015-06-18

什么是时间序列数据库?

时间序列数据库专门用于存储和分析时序数据,按时间顺序记录并包含时间戳和测量值。其关键特征包括时间顺序存储、可扩展性、高效查询、数据压缩和支持多种数据类型。优点包括快速数据检索、可扩展性、高可用性、高级分析和成本效益。典型用例涉及物联网、金融、制造、能源和医疗保健。流行的时间序列数据库包括InfluxDB、Prometheus、TimescaleDB、GrafanaLoki和Datadog。
什么是时间序列数据库?
2024-04-02

深入浅出:了解时序数据库 InfluxDB

时序数据库经常应用于机房运维监控、物联网IoT设备采集存储、互联网广告点击分析等基于时间线且多源数据连续涌入数据平台的应用场景,InfluxDB专为时序数据存储而生,尤其是在工业领域的智能制造,未来应用潜力巨大。数据模型 1.时序数据的特征时序数据应用场景就是
深入浅出:了解时序数据库 InfluxDB
2021-08-02

InfluxDB,TimescaleDB和QuestDB三种时序数据库的比较

在过去的十年间,我们亲历了关系型、非关系型、在线分析处理(OLAP)型、以及在线事务处理(OLTP)型数据库的市场之争,也注意到了诸如:Snowflake、MongoDB、Cockroach Labs、以及Neo4j等新型数据库的产生和发展。而根据DB-Eng
InfluxDB,TimescaleDB和QuestDB三种时序数据库的比较
2015-07-30

TimescaleDB比拼InfluxDB:如何选择合适的时序数据库?

时序数据已用于愈来愈多的应用中,包括物联网、DevOps、金融、零售、物流、石油自然气、制造业、汽车、太空、SaaS,乃至机器学习和人工智能。虽然当前时序数据库仅局限于采集度量和监控,可是软件开发人员已经逐渐明白,他们的确须要一款时序数据库,真正设计用于运行多
TimescaleDB比拼InfluxDB:如何选择合适的时序数据库?
2014-06-21

如何使用Redis序列化存储时间序列数据

在Redis中存储时间序列数据可以采用以下两种方式进行序列化:使用Redis的数据结构:可以将时间序列数据存储在Redis的有序集合(Sorted Set)中。在有序集合中,可以将时间作为分数(score),数据值作为成员(member)进
如何使用Redis序列化存储时间序列数据
2024-04-29

Redis的时间序列数据库功能怎么实现

要在Redis中实现时间序列数据库功能,可以利用Redis的数据结构和命令来存储和处理时间序列数据。以下是一种常见的实现方法:使用Redis的有序集合(Sorted Set)来存储时间序列数据。可以将时间戳作为有序集合的分数,将数据值作为成
Redis的时间序列数据库功能怎么实现
2024-05-07

Teradata如何处理时间序列数据和空间数据

Teradata是一个强大的数据仓库解决方案,可以处理各种类型的数据,包括时间序列数据和空间数据。在处理时间序列数据时,Teradata提供了丰富的函数和工具,可以对时间序列数据进行处理、分析和可视化。用户可以使用SQL语句来查询和操作时间
Teradata如何处理时间序列数据和空间数据
2024-04-09

Redis时间序列数据库功能的应用场景有哪些

监控和性能分析:Redis时间序列数据库可用于收集和存储系统性能指标,例如CPU利用率、内存使用量、网络流量等,以便进行实时监控和性能分析。日志分析:Redis时间序列数据库可用于存储和分析大量日志数据,帮助用户了解系统运行情况、排查问题和
Redis时间序列数据库功能的应用场景有哪些
2024-05-07

Cassandra如何处理时间序列数据

Cassandra是一个分布式数据库系统,通常用于处理大规模数据和高并发读写操作。在处理时间序列数据时,Cassandra可以使用以下一些方法:数据模型设计:在设计数据模型时,可以使用时间戳作为主键或者索引列,这样可以方便快速地检索和查询时
Cassandra如何处理时间序列数据
2024-04-09

Prometheus怎么存储时间序列数据

Prometheus 是一款开源的监控系统,通常用于存储和查询时间序列数据。它使用一种称为“metric”的数据模型来表示时间序列数据,每个 metric 都包含一个名称和一组键值对的标签。Prometheus 使用一种称为 TSDB(时间
Prometheus怎么存储时间序列数据
2024-04-09

编程热搜

目录