ansible (2.4.2.0) A
短信预约 -IT技能 免费直播动态提醒
转自 jumpserver 5.0 测试版本 下载地址
https://github.com/hequan2017/zabbix-models/tree/master/ansible_run
只是简单改了一下 能够单独使用。
目录结构
ansible_run/
├── callback.py
├── exceptions.py
├── __init__.py
├── inventory.py
├── runner.py
├── test_inventory.py
└── test_runner.py
下面两个是演示文件
先 pip3 install ansible==2.4.2.0 安装
例子
# -*- coding: utf-8 -*-
from runner import AdHocRunner, CommandRunner
from inventory import BaseInventory
def TestAdHocRunner():
"""
以yml的形式 执行多个命令
:return:
"""
host_data = [
{
"hostname": "testserver",
"ip": "192.168.10.93",
"port": 22,
"username": "root",
"password": "password",
},
]
inventory = BaseInventory(host_data)
runner = AdHocRunner(inventory)
tasks = [
{"action": {"module": "cron","args": "name=\"sync time\" minute=\"*/3\" job=\"/usr/sbin/ntpdate time.nist.gov &> /dev/null\"" }, "name": "run_cmd"},
{"action": {"module": "shell", "args": "whoami"}, "name": "run_whoami"},
]
ret = runner.run(tasks, "all")
print(ret.results_summary)
print(ret.results_raw)
def TestCommandRunner():
"""
执行单个命令,返回结果
:return:
"""
host_data = [
{
"hostname": "testserver",
"ip": "192.168.10.93",
"port": 22,
"username": "root",
"password": "password",
},
]
inventory = BaseInventory(host_data)
runner = CommandRunner(inventory)
res = runner.execute('pwd', 'all')
print(res.results_command)
print(res.results_raw)
print(res.results_command['testserver']['stdout'])
if __name__ == "__main__":
TestAdHocRunner()
TestCommandRunner()
from inventory import BaseInventory
def Test():
"""
返回主机信息,组信息,组内主机信息
:return:
"""
host_list = [{
"hostname": "testserver1",
"ip": "102.1.1.1",
"port": 22,
"username": "root",
"password": "password",
"private_key": "/tmp/private_key",
"become": {
"method": "sudo",
"user": "root",
"pass": None,
},
"groups": ["group1", "group2"],
"vars": {"sexy": "yes"},
}, {
"hostname": "testserver2",
"ip": "8.8.8.8",
"port": 2222,
"username": "root",
"password": "password",
"private_key": "/tmp/private_key",
"become": {
"method": "su",
"user": "root",
"pass": "123",
},
"groups": ["group3", "group4"],
"vars": {"love": "yes"},
}]
inventory = BaseInventory(host_list=host_list)
print("#"*10 + "Hosts" + "#"*10)
for host in inventory.hosts:
print(host)
print("#" * 10 + "Groups" + "#" * 10)
for group in inventory.groups:
print(group)
print("#" * 10 + "all group hosts" + "#" * 10)
group = inventory.get_group('all')
print(group.hosts)
if __name__ == '__main__':
Test()
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341