我的编程空间,编程开发者的网络收藏夹
学习永远不晚

python中的生成器实现周期性报文发送功能

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

python中的生成器实现周期性报文发送功能

使用python中的生成器实现周期性发送列表中数值的报文发送功能。

功能开发背景:提取cantest工具采集到的现场报文数据,希望使用原始的现场数据模拟验证程序现有逻辑,需要开发一个工具能够自动按照报文发送周期依次发送采集到的报文数据中的一个数值。

功能开发需求:多个报文发送对象共用同一个报文发送线程,多个对象间的报文发送周期不同,多个对象间的总报文发送数据长度不同,能够允许报文发送过程中断及恢复某个对象的报文发送。

功能开发实现逻辑:在固定发送对象某个数值的基础程序版本上增加新的功能,考虑使用python中生成器实现周期性提取对象数值发送报文的功能。

目前只需要发送两个对象的报文数据,先定义两个使用yield生成器:

    def yield_item_value_1(self):
        item_value_list = self.item_value_dict[item_list[0]]
        for i in range(len(item_value_list)):
            yield item_value_list[i]

    def yield_item_value_2(self):
        item_value_list = self.item_value_dict[item_list[1]]
        for i in range(len(item_value_list)):
            yield item_value_list[i]

报文发送线程中的run()函数:

    def run(self):
        # 实时更新item的被选状态
        self.get_checkbox_res_func()
        # 获取每个对象的实际物理值
        self.get_item_value_dict()
        self.item1_value_func = self.yield_item_value_1()
        self.item2_value_func = self.yield_item_value_2()
        while self.Flag:
            if any(msg_send_flag_dict.values()):
                # 每隔second秒执行func函数
                timer = Timer(0.01, self.tick_10ms_func)
                timer.start()
                self.send_working_msg(self.working_can_device, self.working_can_channel)
                timer.join()
            else:
                mes_info = "Goodbye *** 自动发送所有报文数据结束!!!"
                toastone = wx.MessageDialog(None, mes_info, "信息提示",
                                            wx.YES_DEFAULT | wx.ICON_QUESTION)
                if toastone.ShowModal() == wx.ID_YES:  # 如果点击了提示框的确定按钮
                    toastone.Destroy()  # 则关闭提示框
                break

报文周期性发送函数:

    def send_working_msg(self, can_device, device_id):
        for idx in range(len(item_list)):
            if msg_send_flag_dict[item_list[idx]] == 1:
                msg_id_idx = msg_operation_list.index("报文ID") - 1
                msg_id = eval(str(self.operation_dict[item_list[idx]][msg_id_idx]).strip())
                # 获取报文发送帧类型
                msg_type_idx = msg_operation_list.index("帧类型") - 1
                msg_type = str(self.operation_dict[item_list[idx]][msg_type_idx])
                msg_type = 1 if msg_type == "扩展帧" else 0
                # 获取报文发送周期
                msg_cycle_idx = msg_operation_list.index("周期(ms)") - 1
                msg_cycle = int(self.operation_dict[item_list[idx]][msg_cycle_idx])
                send_cycle = msg_cycle / 10
                if msg_tick_10ms_dict["_".join(["tick", "10ms", str(idx)])] >= send_cycle:
                    # 开始喂值
                    if idx == 0:
                        try:
                            item_phyValue = next(self.item1_value_func)
                        except StopIteration:
                            msg_send_flag_dict[item_list[idx]] = 0
                            continue
                    else:
                        try:
                            item_phyValue = next(self.item2_value_func)
                        except StopIteration:
                            msg_send_flag_dict[item_list[idx]] = 0
                            continue
                    msg_data = self.get_item_msg(item_list[idx], item_phyValue)
                    if send_msg(msg_id, msg_type, msg_data, can_device, device_id, 0):
                        print("发送报文成功")
                        # print("msg_data", msg_data)
                        msg_tick_10ms_dict["_".join(["tick", "10ms", str(idx)])] = 0
                    else:
                        pass
                        # print("发送报文失败")
                        # mes_info = "发送报文失败"
                        # toastone = wx.MessageDialog(None, mes_info, "信息提示",
                        #                             wx.YES_DEFAULT | wx.ICON_QUESTION)
                        # if toastone.ShowModal() == wx.ID_YES:  # 如果点击了提示框的确定按钮
                        #     toastone.Destroy()  # 则关闭提示框

功能实现逻辑的待优化点:存在多个对象就需要定义多个存储报文数据的生成器。

上述功能实现逻辑优化如下:

    def set_yield_func(self):
        item_yield_func_dict = dict()
        for i in range(len(item_list)):
            item_yield_func_dict[item_list[i]] = self.yield_item_value(i)
        return item_yield_func_dict

    def yield_item_value(self, item_idx):
        item_value_list = self.item_value_dict[item_list[item_idx]]
        for i in range(len(item_value_list)):
            yield item_value_list[i]

报文发送线程的run()函数中调用这个存储对象报文发送数据生成器的字典item_yield_func_dict:

    def run(self):
        # 实时更新item的被选状态
        self.get_checkbox_res_func()
        # 获取每个对象的实际物理值
        self.get_item_value_dict()
        self.item_yield_func_dict = self.set_yield_func()
        …………

从存储每个对象生成器的字典item_yield_func_dict中获取生成器对象:

                    try:
                        item_phyValue = next(self.item_yield_func_dict[item_list[idx]])
                    except StopIteration:
                        msg_send_flag_dict[item_list[idx]] = 0
                        continue

到此这篇关于python中的生成器实现周期性报文发送功能的文章就介绍到这了,更多相关python 周期性报文发送 内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

python中的生成器实现周期性报文发送功能

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

python中的生成器实现周期性报文发送功能

本文主要介绍了python中的生成器实现周期性报文发送功能,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
2023-03-08

怎么使用python中的生成器实现周期性报文发送功能

这篇文章主要介绍了怎么使用python中的生成器实现周期性报文发送功能的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇怎么使用python中的生成器实现周期性报文发送功能文章都会有所收获,下面我们一起来看看吧。使
2023-07-05

Android开发中怎么实现一个长按将文章生成图片的功能

这篇文章将为大家详细讲解有关Android开发中怎么实现一个长按将文章生成图片的功能,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。长按菜单实现WebView可以如下实现:mWebView.s
2023-05-31

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录