Python如何实时获取任务请求对应的Nginx日志
这篇文章主要介绍“Python如何实时获取任务请求对应的Nginx日志”,在日常操作中,相信很多人在Python如何实时获取任务请求对应的Nginx日志问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Python如何实时获取任务请求对应的Nginx日志”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
需求描述
项目需求测试过程中,需要向Nginx服务器发送一些用例请求,然后查看对应的Nginx日志,判断是否存在特征内容,来判断任务是否执行成功。为了提升效率,需要将这一过程实现自动化。
实践环境
Python 3.6.5
代码设计与实现
#!/usr/bin/env python# -*- coding:utf-8 -*-'''@CreateTime: 2021/06/26 9:05@Author : shouke'''import timeimport threadingimport subprocessfrom collections import dequedef collect_nginx_log(): global nginx_log_queue global is_tasks_compete global task_status args = 'tail -0f /usr/local/openresty/nginx/logs/access.log' while task_status != 'req_log_got': with subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines = True) as proc: log_for_req = '' outs, errs = '', '' try: outs, errs = proc.communicate(timeout=2) except subprocess.TimeoutExpired: print('获取nginx日志超时,正在重试') proc.kill() try: outs, errs = proc.communicate(timeout=5) except subprocess.TimeoutExpired: print('获取nginx日志超时,再次超时,停止重试') break finally: for line in outs.split('\n'): flag = '\"client_ip\":\"10.118.0.77\"' # 特征 if flag in line: # 查找包含特征内容的日志 log_for_req += line if task_status == 'req_finished': nginx_log_queue.append(log_for_req) task_status = 'req_log_got'def run_tasks(task_list): ''' 运行任务 :param task_list 任务列表 ''' global nginx_log_queue global is_tasks_compete global task_status for task in task_list: thread = threading.Thread(target=collect_nginx_log, name="collect_nginx_log") thread.start() time.sleep(1) # 执行任务前,让收集日志线程先做好准备 print('正在执行任务:%s' % task.get('name')) # 执行Nginx任务请求 # ... task_status = 'req_finished' time_to_wait = 0.1 while task_status != 'req_log_got': # 请求触发的nginx日志收集未完成 time.sleep(time_to_wait) time_to_wait += 0.01 else:# 获取到用例请求触发的nginx日志 if nginx_log_queue: nginx_log = nginx_log_queue.popleft() task_status = 'req_ready' # 解析日志 # do something here # ... else: print('存储请求日志的队列为空') # do something here # ...if __name__ == '__main__': nginx_log_queue = deque() is_tasks_compete = False # 所有任务是否执行完成 task_status = 'req_ready' # req_ready,req_finished,req_log_got # 存放执行次任务任务的一些状态 print('###########################任务开始###########################') tast_list = [{'name':'test_task', 'other':'...'}] run_tasks(tast_list) is_tasks_compete = True current_active_thread_num = len(threading.enumerate()) while current_active_thread_num != 1: time.sleep(2) current_active_thread_num = len(threading.enumerate()) print('###########################任务完成###########################')
注意:
上述代码为啥不一步到位,直接 tail -0f /usr/local/openresty/nginx/logs/access.log | grep "特征内容"
呢?这是因为这样做无法获取到Nginx的日志
实践时发现,第一次执行proc.communicate(timeout=2)
获取日志时,总是无法获取,会超时,需要二次获取,并且timeout
设置太小时(实践时尝试过设置为1秒
),也会导致第二次执行时无法获取Nginx日志。
到此,关于“Python如何实时获取任务请求对应的Nginx日志”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341