我的编程空间,编程开发者的网络收藏夹
学习永远不晚

使用Python实现tail的示例代码

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

使用Python实现tail的示例代码

前记

tail是一个常用的Linux命令, 它可以打印文件的后面n行数据, 也能实时输出文件的追加数据. tail的实现很简单,但是要实现一个完善的tail却需要考虑很多细节,如果要注重性能,则需要引入一些其他的机制. 一开始只是为了单纯的实现Linux的tail的基本功能,后面随着需要对日志文件的高性能读取则需要的Linux的inotify机制去完善. 相关源码见:https://github.com/so1n/example/tree/master/example_python/example_python/tail

1.第一版--从文件尾部读取实时数据

主要思路是: 打开文件, 把指针移动到文件最后, 然后有数据则输出数据, 无数据则休眠一段时间.

import time
import sys

from typing import Callable, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval

    def __call__(self):
        with open(self.file_name) as f:
            f.seek(0, 2)  # 从文件结尾处开始seek
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都会每次都打印新的一行
                else:
                    time.sleep(self.interval)


if __name__ == '__main__':
    filename: str = sys.argv[0]
    Tail(filename)()

之后只要做如下调用即可:

python xxx.py filename 

2.第二版--实现tail -f

tail -f默认先读取最后10行数据,再从文件尾部读取实时数据.如果对于小文件,可以先读取所有文件内容,并输出最后10行, 但是读取全文再获取最后10行的性能不高, 而从后滚10行的边界条件也很复杂, 先看先读取全文再获取最后10行的实现:

import time
import sys

from typing import Callable, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval

    def __call__(self):
        with open(self.file_name) as f:
            self.read_last_line(f)
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都会每次都打印新的一行
                else:
                    time.sleep(self.interval)

    def read_last_line(self, f):
        last_lines = f.readlines()[-10:]
        for line in last_lines:
            self.output(line)

if __name__ == '__main__':
    filename: str = sys.argv[0]
    Tail(filename)()   

可以看到实现很简单, 相比第一版只多了个read_last_line的函数, 接下来就要解决性能的问题了, 当文件很大的时候, 这个逻辑是不行的, 特别是有些日志文件经常有几个G大, 如果全读出来内存就爆了. 而在Linux系统中, 没有一个接口可以指定指针跳到倒数10行, 只能使用如下方法来模拟输出倒数10行:

  • 首先游标跳转到最新的字符, 保存当前游标, 然后预估一行数据的字符长度, 最好偏多, 这里我按1024字符长度为一行来处理
  • 然后利用seek的方法,跳转到seek(-1024 * 10, 2)的字符, 这就是我们预估的倒数10行内的内容
  • 接着对内容进行判断, 如果跳转的字符长度小于 10 * 1024, 则证明整个文件没有10行, 则采用原来的read_last_line方法.
  • 如果跳转到字符长度等于1024 * 10, 则利用换行符计算已取字符长度共有多少行,如果行数大于10,那只输出最后10行,如果只读了4行,则继续读6*1024,直到读满10行为止

通过以上步奏, 就把倒数10行的数据计算好了可以打印出来, 可以进入追加数据了, 但是这时候文件内容可能发生改变了, 我们的游标也发生改变了, 这时候要把游标跳回到刚才保存的游标,防止漏打或者重复打印数据.

分析完毕后, 就可以开始重构read_last_line函数了.

import time
import sys

from typing import Callable, List, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1,
            len_line: int = 1024
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval
        self.len_line: int = len_line

    def __call__(self, n: int = 10):
        with open(self.file_name) as f:
            self.read_last_line(f, n)
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都会每次都打印新的一行
                else:
                    time.sleep(self.interval)

    def read_last_line(self, file, n):
        read_len: int = self.len_line * n

        # 跳转游标到最后
        file.seek(0, 2)
        # 获取当前结尾的游标位置
        now_tell: int = file.tell()
        while True:
            if read_len > file.tell():
                # 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来
                file.seek(0) # 由于read方法是按照游标进行打印, 所以要重置游标
                last_line_list: List[str] = file.read().split('\n')[-n:]
                # 重新获取游标位置
                now_tell: int = file.tell()
                break
            # 跳转到我们预估的字符位置
            file.seek(-read_len, 2)
            read_str: str = file.read(read_len)
            cnt: int = read_str.count('\n')
            if cnt >= n:
                # 如果获取的行数大于要求的行数,则获取前n行的行数
                last_line_list: List[str] = read_str.split('\n')[-n:]
                break
            else:
                # 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取
                if cnt == 0:
                    line_per: int = read_len
                else:
                    line_per: int = int(read_len / cnt)
                read_len = line_per * n

        for line in last_line_list:
            self.output(line + '\n')
        # 重置游标,确保接下来打印的数据不重复
        file.seek(now_tell)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--filename")
    parser.add_argument("-n", "--num", default=10)
    args, unknown = parser.parse_known_args()
    if not args.filename:
        raise RuntimeError('filename args error')
    Tail(args.filename)(int(args.num))

3.第三版--优雅的读取输出日志文件

可以发现实时读取那块的逻辑性能还是很差, 如果每秒读一次文件,实时性就太慢了,把间隔改小了,则处理器占用太多. 性能最好的情况是如果能得知文件更新再进行打印文件, 那性能就能得到保障了.庆幸的是,在Linux中inotify提供了这样的功能. 此外,日志文件有一个特点就是会进行logrotate,如果日志被logrotate了,那我们就需要重新打开文件,并进一步读取数据, 这种情况也可以利用到inotify, 当inotify获取到文件重新打开的事件时,我们就重新打开文件,再读取.

import os
import sys

from typing import Callable, List, NoReturn

import pyinotify

multi_event = pyinotify.IN_MODIFY | pyinotify.IN_MOVE_SELF  # 监控多个事件


class InotifyEventHandler(pyinotify.ProcessEvent):  # 定制化事件处理类,注意继承
    """
    执行inotify event的封装
    """
    f: 'open()'
    filename: str
    path: str
    wm: 'pyinotify.WatchManager'
    output: Callable

    def my_init(self, **kargs):
        """pyinotify.ProcessEvent要求不能直接继承__init__, 而是要重写my_init, 我们重写这一段并进行初始化"""

        # 获取文件
        filename: str = kargs.pop('filename')
        if not os.path.exists(filename):
            raise RuntimeError('Not Found filename')
        if '/' not in filename:
            filename = os.getcwd() + '/' + filename
        index = filename.rfind('/')
        if index == len(filename) - 1 or index == -1:
            raise RuntimeError('Not a legal path')

        self.f = None
        self.filename = filename
        self.output: Callable = kargs.pop('output')
        self.wm = kargs.pop('wm')
        # 只监控路径,这样就能知道文件是否移动
        self.path = filename[:index]
        self.wm.add_watch(self.path, multi_event)

    def read_line(self):
        """统一的输出方法"""
        for line in self.f.readlines():
            self.output(line)

    def process_IN_MODIFY(self, event):
        """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生变化, 进行文件读取"""
        if event.pathname == self.filename:
            self.read_line()

    def process_IN_MOVE_SELF(self, event):
        """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生重新打开, 进行文件读取"""
        if event.pathname == self.filename:
            # 检测到文件被移动重新打开文件
            self.f.close()
            self.f = open(self.filename)
            self.read_line()

    def __enter__(self) -> 'InotifyEventHandler':
        self.f = open(self.filename)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.f.close()


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1,
            len_line: int = 1024
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval
        self.len_line: int = len_line

        wm = pyinotify.WatchManager()  # 创建WatchManager对象
        inotify_event_handler = InotifyEventHandler(
            **dict(filename=file_name, wm=wm, output=output)
        )  # 实例化我们定制化后的事件处理类, 采用**dict传参数
        wm.add_watch('/tmp', multi_event)  # 添加监控的目录,及事件
        self.notifier = pyinotify.Notifier(wm, inotify_event_handler)  # 在notifier实例化时传入,notifier会自动执行
        self.inotify_event_handle: 'InotifyEventHandler' = inotify_event_handler

    def __call__(self, n: int = 10):
        """通过inotify的with管理打开文件"""
        with self.inotify_event_handle as i:
            # 先读取指定的行数
            self.read_last_line(i.f, n)
            # 启用inotify的监听
            self.notifier.loop()

    def read_last_line(self, file, n):
        read_len: int = self.len_line * n

        # 获取当前结尾的游标位置
        file.seek(0, 2)
        now_tell: int = file.tell()
        while True:
            if read_len > file.tell():
                # 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来
                file.seek(0)
                last_line_list: List[str] = file.read().split('\n')[-n:]
                # 重新获取游标位置
                now_tell: int = file.tell()
                break
            file.seek(-read_len, 2)
            read_str: str = file.read(read_len)
            cnt: int = read_str.count('\n')
            if cnt >= n:
                # 如果获取的行数大于要求的行数,则获取前n行的行数
                last_line_list: List[str] = read_str.split('\n')[-n:]
                break
            else:
                # 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取
                if cnt == 0:
                    line_per: int = read_len
                else:
                    line_per: int = int(read_len / cnt)
                read_len = line_per * n

        for line in last_line_list:
            self.output(line + '\n')
        # 重置游标,确保接下来打印的数据不重复
        file.seek(now_tell)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--filename")
    parser.add_argument("-n", "--num", default=10)
    args, unknown = parser.parse_known_args()
    if not args.filename:
        raise RuntimeError('filename args error')
    Tail(args.filename)(int(args.num))

可以看到, 从原本的open打开文件改为用inotify打开文件(这时候会调用my_init方法进行初始化), 打开后还是运行我们打开原来n行的代码, 然后就交给inotify运行. 在inotify运行之前, 我们把重新打开文件方法和打印文件方法都挂载在inotifiy对应的事件里, 之后inotify运行时, 会根据对应的事件执行对应的方法.

到此这篇关于使用Python实现tail的示例代码的文章就介绍到这了,更多相关Python tail内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

使用Python实现tail的示例代码

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

使用Python实现tail的示例代码

tail是一个常用的Linux命令,它可以打印文件的后面n行数据,也能实时输出文件的追加数据。本文就来用Python实现tail,感兴趣的可以了解一下
2023-03-01

怎么使用Python实现tail

本篇内容介绍了“怎么使用Python实现tail”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!1.第一版--从文件尾部读取实时数据主要思路是
2023-07-05

Python使用Qt5实现水平导航栏的示例代码

本文主要介绍了Python使用Qt5实现水平导航栏的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
2023-03-06

Spring @Async 的使用与实现的示例代码

首先Spring AOP有两个重要的基础接口,Advisor和PointcutAdvisor,接口声明如下:Advisor接口声明:public interface Advisor { Advice getAdvice(); boole
2023-05-31

Python实现PING命令的示例代码

本文主要介绍了Python实现PING命令的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
2023-01-06

python实现跳表SkipList的示例代码

跳表 跳表,又叫做跳跃表、跳跃列表,在有序链表的基础上增加了“跳跃”的功能,由William Pugh于1990年发布,设计的初衷是为了取代平衡树(比如红黑树)。 Redis、LevelDB 都是著名的 Key-Value 数据库,而Red
2022-06-02

Python实现PDF转MP3的示例代码

我们平常看到很多文件都是PDF格式,网上的各类书籍多为此格式。有时候不方便阅读,或者怕费眼睛伤颈椎,那么有没有一种方法可以把它变为音频,本文就来和大家详细讲讲
2023-05-18

python中Switch/Case实现的示例代码

学习Python过程中,发现没有switch-case,过去写C习惯用Switch/Case语句,官方文档说通过if-elif实现。所以不妨自己来实现Switch/Case功能。使用if…elif…elif…else 实现switch/ca
2022-06-04

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录