我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Python封装zabbix-get接口的代码分享

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Python封装zabbix-get接口的代码分享

Zabbix 是一款强大的开源网管监控工具,该工具的客户端与服务端是分开的,我们可以直接使用自带的zabbix_get命令来实现拉取客户端上的各种数据,在本地组装参数并使用Popen开子线程执行该命令,即可实现批量监测。

封装Engine类: 该类的主要封装了Zabbix接口的调用,包括最基本的参数收集.

import subprocess,datetime,time,math

class Engine():
    def __init__(self,address,port):
        self.address = address
        self.port = port

    def GetValue(self,key):
        try:
            command = "get.exe -s {0} -p {1} -k {2}".format(self.address,self.port,key).split(" ")
            start = datetime.datetime.now()
            process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
            while process.poll() is None:
                time.sleep(1)
                now = datetime.datetime.now()
                if (now - start).seconds > 2:
                    return 0
            return str(process.stdout.readlines()[0].split()[0],"utf-8")
        except Exception:
            return 0

    # ping检测
    def GetPing(self):
        ref_dict = {"Address":0,"Ping":0}
        ref_dict["Address"] = self.address
        ref_dict["Ping"] = self.GetValue("agent.ping")
        if ref_dict["Ping"] == "1":
            return ref_dict
        else:
            ref_dict["Ping"] = "0"
            return ref_dict
        return ref_dict

    # 获取主机组基本信息
    def GetSystem(self):
        ref_dict = { "Address" : 0 ,"HostName" : 0,"Uname":0 }
        ref_dict["Address"] = self.address
        ref_dict["HostName"] = self.GetValue("system.hostname")
        ref_dict["Uname"] = self.GetValue("system.uname")
        return ref_dict

    # 获取CPU利用率
    def GetCPU(self):
        ref_dict = { "Address": 0 ,"Core": 0,"Active":0 , "Avg1": 0 ,"Avg5":0 , "Avg15":0 }
        ref_dict["Address"] = self.address
        ref_dict["Core"] = self.GetValue("system.cpu.num")
        ref_dict["Active"] = math.ceil(float(self.GetValue("system.cpu.util")))
        ref_dict["Avg1"] = self.GetValue("system.cpu.load[,avg1]")
        ref_dict["Avg5"] = self.GetValue("system.cpu.load[,avg5]")
        ref_dict["Avg15"] = self.GetValue("system.cpu.load[,avg15]")
        return ref_dict

    # 获取内存利用率
    def GetMemory(self):
        ref_dict = { "Address":"0","Total":"0","Free":0,"Percentage":"0" }
        ref_dict["Address"] = self.address

        fps = self.GetPing()
        if fps['Ping'] != "0":
            ref_dict["Total"] = self.GetValue("vm.memory.size[total]")
            ref_dict["Free"] = self.GetValue("vm.memory.size[free]")
            # 计算百分比: percentage = 100 - int(Free/int(Total/100))
            ref_dict["Percentage"] = str( 100 - int( int(ref_dict.get("Free")) / (int(ref_dict.get("Total"))/100)) ) + "%"
            return ref_dict
        else:
            return ref_dict

    # 获取磁盘数据
    def GetDisk(self):
        ref_list = []

        fps = self.GetPing()
        if fps['Ping'] != "0":
            disk_ = eval( self.GetValue("vfs.fs.discovery"))
            for x in range(len(disk_)):
                dict_ = {"Address": 0, "Name": 0, "Type": 0, "Free": 0}
                dict_["Address"] = self.address
                dict_["Name"] = disk_[x].get("{#FSNAME}")
                dict_["Type"] = disk_[x].get("{#FSTYPE}")
                if dict_["Type"] != "UNKNOWN":
                    pfree = self.GetValue("vfs.fs.size[\"{0}\",pfree]".format(dict_["Name"]))
                    dict_["Free"] = str(math.ceil(float(pfree)))
                else:
                    dict_["Free"] = -1
                ref_list.append(dict_)
            return ref_list
        return ref_list

    # 获取进程状态
    def GetProcessStatus(self,process_name):
        fps = self.GetPing()
        dict_ = {"Address": '0', "ProcessName": '0', "ProcessCount": '0', "Status": '0'}
        if fps['Ping'] != "0":
            proc_id = self.GetValue("proc.num[\"{}\"]".format(process_name))
            dict_['Address'] = self.address
            dict_['ProcessName'] = process_name
            if proc_id != "0":
                dict_['ProcessCount'] = proc_id
                dict_['Status'] = "True"
            else:
                dict_['Status'] = "False"
            return dict_
        return dict_

    # 获取端口开放状态
    def GetNetworkPort(self,port):
        dict_ = {"Address": '0', "Status": 'False'}
        dict_['Address'] = self.address
        fps = self.GetPing()
        if fps['Ping'] != "0":
            port_ = self.GetValue("net.tcp.listen[{}]".format(port))
            if port_ == "1":
                dict_['Status'] = "True"
            else:
                dict_['Status'] = "False"
            return dict_
        return dict_

    # 检测Web服务器状态 通过本地地址:端口 => 检测目标地址:端口
    def CheckWebServerStatus(self,check_addr,check_port):
        dict_ = {"local_address": "0", "remote_address": "0", "remote_port": "0", "Status":"False"}
        fps = self.GetPing()
        dict_['local_address'] = self.address
        dict_['remote_address'] = check_addr
        dict_['remote_port'] = check_port
        if fps['Ping'] != "0":
            check_ = self.GetValue("net.tcp.port[\"{}\",\"{}\"]".format(check_addr,check_port))
            if check_ == "1":
                dict_['Status'] = "True"
            else:
                dict_['Status'] = "False"
            return dict_
        return dict_

当我们需要使用时,只需要定义变量调用即可,其调用代码如下。

from engine import Engine

if __name__ == "__main__":
    ptr_windows = Engine("127.0.0.1","10050")
    ret = ptr_windows.GetDisk()
    if len(ret) != 0:
        for item in ret:
            addr = item.get("Address")
            name = item.get("Name")
            type = item.get("Type")
            space = item.get("Free")
            if type != "UNKNOWN" and space != -1:
                print("地址: {} --> 盘符: {} --> 格式: {} --> 剩余空间: {}".format(addr,name,type,space))

到此这篇关于Python封装zabbix-get接口的代码分享的文章就介绍到这了,更多相关Python封装zabbix-get接口内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Python封装zabbix-get接口的代码分享

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

node.js实现微信JS-API封装接口的示例代码

Wechat JS-API接口 功能: 用于管理和获取微信 JSSDK 生产的access_token、jsapi_ticket和签名(signature) Installationnpm i wechat_interaction_jsap
2022-06-04

Java编程接口调用的作用及代码分享

很多JAVA初级程序员对于接口存在的意义很疑惑。不知道接口到底是有什么作用,为什么要定义接口。好像定义接口是提前做了个多余的工作。下面我给大家总结了4点关于JAVA中接口存在的意义:  1、重要性:在Java语言中, abstract cl
2023-05-30

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录